Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
ApacheFetch waits for asynchronous processing inside PSOL. Currently …
…that wait has no timeout, but as described in #1048 PSOL sometimes fails to return.  In Nginx and other event-loop based servers this causes a problem for that single request but doesn't take down the server, but in Apache we currently tie up a thread indefinitely waiting for the fetch to finish.  This change makes Apache act more like the other servers: after a 2min timeout it gives up waiting on PSOL and frees up the thread.

This doesn't solve the underlying PSOL issue(s), but it makes it much less damaging to Apache servers.
  • Loading branch information
jeffkaufman authored and crowell committed Jul 13, 2015
1 parent 9a37778 commit e59644f
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 74 deletions.
217 changes: 167 additions & 50 deletions net/instaweb/apache/instaweb_handler.cc
Expand Up @@ -57,6 +57,7 @@
#include "net/instaweb/util/public/statistics.h"
#include "net/instaweb/util/public/string_writer.h"
#include "net/instaweb/util/public/timer.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/ref_counted_ptr.h"
#include "pagespeed/kernel/http/http_options.h"
#include "pagespeed/kernel/http/response_headers.h"
Expand Down Expand Up @@ -93,24 +94,30 @@ const char kResourceUrlYes[] = "<YES>";
// instance).
const size_t kMaxPostSizeBytes = 131072;

// How long to try waiting before giving up.
const int kMaxWaitMs = 2 * Timer::kMinuteMs;

} // namespace

ApacheFetch::ApacheFetch(const GoogleString& mapped_url,
StringPiece debug_info,
ServerContext* server_context,
request_rec* request,
const RequestContextPtr& request_context,
const RewriteOptions* options)
: AsyncFetchUsingWriter(request_context, &apache_writer_),
: AsyncFetch(request_context),
mapped_url_(mapped_url),
apache_writer_(request),
options_(options),
server_context_(server_context),
mutex_(server_context->thread_system()->NewMutex()),
condvar_(mutex_->NewCondvar()),
abandoned_(false),
done_(false),
headers_sent_(false),
handle_error_(true),
status_ok_(false),
is_proxy_(false),
options_(options),
blocking_fetch_timeout_ms_(options_->blocking_fetch_timeout_ms()) {
// We are proxying content, and the caching in the http configuration
// should not apply; we want to use the caching from the proxy.
Expand All @@ -119,6 +126,7 @@ ApacheFetch::ApacheFetch(const GoogleString& mapped_url,
ApacheRequestToRequestHeaders(*request, request_headers());
request_headers()->RemoveAll(HttpAttributes::kCookie);
request_headers()->RemoveAll(HttpAttributes::kCookie2);
debug_info.CopyToString(&debug_info_);
}

ApacheFetch::~ApacheFetch() {
Expand Down Expand Up @@ -158,9 +166,6 @@ void ApacheFetch::HandleHeadersComplete() {
}

// TODO(sligocki): Add X-Mod-Pagespeed header.
if (content_length_known() && !inject_error_message) {
apache_writer_.set_content_length(content_length());
}

// Default cache-control to nocache.
if (!response_headers()->Has(HttpAttributes::kCacheControl)) {
Expand All @@ -169,49 +174,139 @@ void ApacheFetch::HandleHeadersComplete() {
}
response_headers()->ComputeCaching();

apache_writer_.OutputHeaders(response_headers());
{
ScopedMutex lock(mutex_.get());
if (!abandoned_) {
if (content_length_known() && !inject_error_message) {
apache_writer_.set_content_length(content_length());
}

apache_writer_.OutputHeaders(response_headers());
headers_sent_ = true;

if (inject_error_message) {
apache_writer_.Write("Missing Content-Type required for proxied "
"resource", server_context_->message_handler());
apache_writer_.set_squelch_output(true);
if (inject_error_message) {
apache_writer_.Write("Missing Content-Type required for proxied "
"resource", server_context_->message_handler());
apache_writer_.set_squelch_output(true);
}
return;
}
}
server_context_->message_handler()->Message(
kWarning,
"HeadersComplete for url %s received after "
"being abandoned for timing out.",
mapped_url_.c_str());
return; // Don't do anything.
}
}

void ApacheFetch::HandleDone(bool success) {
ScopedMutex lock(mutex_.get());
done_ = true;
if (status_ok_ && !success) {
{
ScopedMutex lock(mutex_.get());
done_ = true;

if (!abandoned_) {
if (status_ok_ && !success) {
server_context_->message_handler()->Message(
kWarning,
"Response for url %s issued with status %d %s but "
"failed to complete.",
mapped_url_.c_str(), response_headers()->status_code(),
response_headers()->reason_phrase());
}
// We've not been abandoned; let our owner know we're done and they will
// delete us.
condvar_->Signal();
return;
}
server_context_->message_handler()->Message(
kWarning,
"Response for url %s issued with status %d %s but "
"failed to complete.",
"Response for url %s completed with status %d %s after "
"being abandoned for timing out.",
mapped_url_.c_str(), response_headers()->status_code(),
response_headers()->reason_phrase());
}
condvar_->Signal();
// We've been abandoned, so we have ownership of ourself.
delete this;
}

bool ApacheFetch::HandleWrite(const StringPiece& sp,
MessageHandler* handler) {
{
ScopedMutex lock(mutex_.get());
if (!abandoned_) {
return apache_writer_.Write(sp, handler);
}
}
handler->Message(kWarning,
"Write of %ld bytes for url %s received after "
"being abandoned for timing out.",
sp.size(), mapped_url_.c_str());
return false; // Drop the write.
}

void ApacheFetch::Wait() {
bool ApacheFetch::HandleFlush(MessageHandler* handler) {
{
ScopedMutex lock(mutex_.get());
if (!abandoned_) {
return apache_writer_.Flush(handler);
}
}
handler->Message(kWarning,
"Flush for url %s received after "
"being abandoned for timing out.",
mapped_url_.c_str());
return false; // Drop the flush.
}

ApacheFetch::WaitResult ApacheFetch::Wait(const RewriteDriver& rewrite_driver) {
MessageHandler* handler = server_context_->message_handler();
Timer* timer = server_context_->timer();
int64 start_ms = timer->NowMs();
{
ScopedMutex lock(mutex_.get());
while (!done_) {
condvar_->TimedWait(blocking_fetch_timeout_ms_);
if (!done_) {
int64 elapsed_ms = timer->NowMs() - start_ms;
WaitResult result = kWaitSuccess;
mutex_->Lock();
while (!done_ && (result == kWaitSuccess)) {
condvar_->TimedWait(blocking_fetch_timeout_ms_);
if (!done_) {
int64 elapsed_ms = timer->NowMs() - start_ms;
if (elapsed_ms > kMaxWaitMs) {
abandoned_ = true;
// Now that we're abandoned the instaweb context needs to drop its
// pointer to us and not free us. We tell it to do this by returning
// kAbandonedAndHandled / kAbandonedAndDeclined.
//
// If we never sent headers out then it's safe to DECLINE this request
// and pass it to a different content handler. Otherwise some data was
// sent out and we have to accept this as it is.
result = headers_sent_ ? kAbandonedAndHandled : kAbandonedAndDeclined;
}

// Unlock briefly so we can write messages without holding the lock.
mutex_->Unlock();
if (result != kWaitSuccess) {
handler->Message(
kWarning, "Abandoned URL %s after %g sec (debug=%s, driver=%s) ",
mapped_url_.c_str(), elapsed_ms / 1000.0, debug_info_.c_str(),
rewrite_driver.ToString(true /* show_detached_contexts */).c_str());
} else {
handler->Message(
kWarning, "Waiting for completion of URL %s for %g sec",
mapped_url_.c_str(), elapsed_ms / 1000.0);
}
mutex_->Lock();
}
}
mutex_->Unlock();
return result;
}

bool ApacheFetch::IsCachedResultValid(const ResponseHeaders& headers) {
ScopedMutex lock(mutex_.get());
// options_ isn't valid after we're abandoned, so make sure we haven't been.
if (abandoned_) {
return false;
}
return OptionsAwareHTTPCacheCallback::IsCacheValid(
mapped_url_, *options_, request_context(), headers);
}
Expand All @@ -221,7 +316,8 @@ InstawebHandler::InstawebHandler(request_rec* request)
server_context_(InstawebContext::ServerContextFromServerRec(
request->server)),
rewrite_driver_(NULL),
num_response_attributes_(0) {
num_response_attributes_(0),
fetch_(NULL) {
apache_request_context_ = server_context_->NewApacheRequestContext(request);
request_context_.reset(apache_request_context_);

Expand All @@ -244,13 +340,31 @@ InstawebHandler::InstawebHandler(request_rec* request)
}

InstawebHandler::~InstawebHandler() {
WaitForFetch();
// If fetch_ is null we either never tried to fetch anything or it took
// ownership of itself after timing out.
if (fetch_ != NULL) {
if (WaitForFetch() == ApacheFetch::kWaitSuccess) {
delete fetch_; // Fetch completed normally.
} else {
// Fetch took ownership of itself and will continue in the background.
CHECK(fetch_ == NULL);
}
}
}

void InstawebHandler::WaitForFetch() {
if (fetch_.get() != NULL) {
fetch_->Wait();
ApacheFetch::WaitResult InstawebHandler::WaitForFetch() {
if (fetch_ == NULL) {
return ApacheFetch::kWaitSuccess; // Nothing to wait for.
}

ApacheFetch::WaitResult wait_result = fetch_->Wait(*rewrite_driver_);
if (wait_result != ApacheFetch::kWaitSuccess) {
// Give up on waiting for the fetch so we stop tying up a thread. Fetch has
// taken ownership of itself, will discard any messages it receives if it's
// abandoned, and will delete itself when Done() is called, if ever.
fetch_ = NULL;
}
return wait_result;
}

void InstawebHandler::SetupSpdyConnectionIfNeeded() {
Expand Down Expand Up @@ -279,11 +393,12 @@ RewriteDriver* InstawebHandler::MakeDriver() {
return rewrite_driver_;
}

ApacheFetch* InstawebHandler::MakeFetch(const GoogleString& url) {
DCHECK(fetch_.get() == NULL);
fetch_.reset(new ApacheFetch(url, server_context_, request_, request_context_,
options_));
return fetch_.get();
ApacheFetch* InstawebHandler::MakeFetch(const GoogleString& url,
StringPiece debug_info) {
DCHECK(fetch_ == NULL);
fetch_ = new ApacheFetch(url, debug_info, server_context_, request_,
request_context_, options_);
return fetch_;
}

/* static */
Expand Down Expand Up @@ -535,12 +650,14 @@ bool InstawebHandler::HandleAsInPlace() {
!= NULL) || (request_->user != NULL));

RewriteDriver* driver = MakeDriver();
MakeFetch(original_url_);
MakeFetch(original_url_, "ipro");
fetch_->set_handle_error(false);
driver->FetchInPlaceResource(stripped_gurl_, false /* proxy_mode */,
fetch_.get());
WaitForFetch();
if (fetch_->status_ok()) {
driver->FetchInPlaceResource(stripped_gurl_, false /* proxy_mode */, fetch_);
ApacheFetch::WaitResult wait_result = WaitForFetch();
if (wait_result != ApacheFetch::kWaitSuccess) {
// Note: fetch_ has been released; no longer safe to look at;
handled = (wait_result == ApacheFetch::kAbandonedAndHandled);
} else if (fetch_->status_ok()) {
server_context_->rewrite_stats()->ipro_served()->Add(1);
handled = true;
} else if ((fetch_->response_headers()->status_code() ==
Expand Down Expand Up @@ -596,16 +713,15 @@ bool InstawebHandler::HandleAsProxy() {
&host_header, &is_proxy) &&
is_proxy) {
RewriteDriver* driver = MakeDriver();
MakeFetch(mapped_url);
MakeFetch(mapped_url, "proxy");
fetch_->set_is_proxy(true);
driver->SetRequestHeaders(*fetch_->request_headers());
server_context_->proxy_fetch_factory()->StartNewProxyFetch(
mapped_url, fetch_.get(), driver, NULL, NULL);
WaitForFetch();
handled = true;
mapped_url, fetch_, driver, NULL, NULL);
handled = WaitForFetch() != ApacheFetch::kAbandonedAndDeclined;
}

return handled;
return handled; // false == declined
}

// Determines whether the url can be handled as a mod_pagespeed or in-place
Expand Down Expand Up @@ -960,7 +1076,7 @@ apr_status_t InstawebHandler::instaweb_handler(request_rec* request) {
server_context->StatisticsPage(is_global_statistics,
instaweb_handler.query_params(),
instaweb_handler.options(),
instaweb_handler.MakeFetch());
instaweb_handler.MakeFetch("statistics"));
return OK;
} else if ((request_handler_str == kAdminHandler) ||
(request_handler_str == kGlobalAdminHandler)) {
Expand All @@ -969,7 +1085,7 @@ apr_status_t InstawebHandler::instaweb_handler(request_rec* request) {
instaweb_handler.stripped_gurl(),
instaweb_handler.query_params(),
instaweb_handler.options(),
instaweb_handler.MakeFetch());
instaweb_handler.MakeFetch("admin"));
ret = OK;
} else if (global_config->enable_cache_purge() &&
!global_config->purge_method().empty() &&
Expand All @@ -978,20 +1094,21 @@ apr_status_t InstawebHandler::instaweb_handler(request_rec* request) {
AdminSite* admin_site = server_context->admin_site();
admin_site->PurgeHandler(instaweb_handler.original_url_,
server_context->cache_path(),
instaweb_handler.MakeFetch());
instaweb_handler.MakeFetch("purge"));
ret = OK;
} else if (request_handler_str == kConsoleHandler) {
InstawebHandler instaweb_handler(request);
server_context->ConsoleHandler(*instaweb_handler.options(),
AdminSite::kOther,
instaweb_handler.query_params(),
instaweb_handler.MakeFetch());
instaweb_handler.MakeFetch("console"));
ret = OK;
} else if (request_handler_str == kMessageHandler) {
InstawebHandler instaweb_handler(request);
server_context->MessageHistoryHandler(*instaweb_handler.options(),
AdminSite::kOther,
instaweb_handler.MakeFetch());
server_context->MessageHistoryHandler(
*instaweb_handler.options(),
AdminSite::kOther,
instaweb_handler.MakeFetch("messages"));
ret = OK;
} else if (request_handler_str == kLogRequestHeadersHandler) {
// For testing CustomFetchHeader.
Expand Down

0 comments on commit e59644f

Please sign in to comment.