Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Commit

Permalink
Add ProxyFetch::CancelOutstanding and FollowFlushes option
Browse files Browse the repository at this point in the history
- Adds ProxyFetch::CancelOutstanding() which helps ngx_pagespeed
  with shutting down quickly by calling Done(false) on all
  live ProxyFetch instances.
- Adds follow_flushes option, which makes ProxyFetch forward
  Flush calls to the RewriteDriver when set.

These changes are needed for: apache/incubator-pagespeed-ngx#936
  • Loading branch information
oschaaf committed Apr 16, 2015
1 parent 0938c2f commit 66cf2f9
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 5 deletions.
7 changes: 7 additions & 0 deletions net/instaweb/rewriter/public/rewrite_options.h
Expand Up @@ -271,6 +271,7 @@ class RewriteOptions {
static const char kFinderPropertiesCacheRefreshTimeMs[];
static const char kFlushBufferLimitBytes[];
static const char kFlushHtml[];
static const char kFollowFlushes[];
static const char kFlushMoreResourcesEarlyIfTimePermits[];
static const char kGoogleFontCssInlineMaxBytes[];
static const char kForbidAllDisabledFilters[];
Expand Down Expand Up @@ -1778,6 +1779,9 @@ class RewriteOptions {
void set_flush_html(bool x) { set_option(x, &flush_html_); }
bool flush_html() const { return flush_html_.value(); }

void set_follow_flushes(bool x) { set_option(x, &follow_flushes_); }
bool follow_flushes() const { return follow_flushes_.value(); }

void set_serve_split_html_in_two_chunks(bool x) {
set_option(x, &serve_split_html_in_two_chunks_);
}
Expand Down Expand Up @@ -3700,6 +3704,9 @@ class RewriteOptions {
Option<bool> respect_vary_;
Option<bool> respect_x_forwarded_proto_;
Option<bool> flush_html_;
// If set to true, ProxyFetch will request a flush on its RewriteDriver when
// Flush() is called on it.
Option<bool> follow_flushes_;
// Should we serve the split html response in two chunks - above the fold and
// below the fold. If set to false, we serve the above the fold and below the
// fold in a single response.
Expand Down
5 changes: 5 additions & 0 deletions net/instaweb/rewriter/rewrite_options.cc
Expand Up @@ -136,6 +136,7 @@ const char RewriteOptions::kFinderPropertiesCacheRefreshTimeMs[] =
"FinderPropertiesCacheRefreshTimeMs";
const char RewriteOptions::kFlushBufferLimitBytes[] = "FlushBufferLimitBytes";
const char RewriteOptions::kFlushHtml[] = "FlushHtml";
const char RewriteOptions::kFollowFlushes[] = "FollowFlushes";
const char RewriteOptions::kFlushMoreResourcesEarlyIfTimePermits[] =
"FlushMoreResourcesEarlyIfTimePermits";
const char RewriteOptions::kForbidAllDisabledFilters[] =
Expand Down Expand Up @@ -1577,6 +1578,10 @@ void RewriteOptions::AddProperties() {
false, &RewriteOptions::flush_html_, "fh", kFlushHtml,
kDirectoryScope,
NULL, true); // TODO(jmarantz): implement for mod_pagespeed.
AddBaseProperty(
false, &RewriteOptions::follow_flushes_, "ff", kFollowFlushes,
kDirectoryScope,
NULL, false);
AddBaseProperty(
false, &RewriteOptions::css_preserve_urls_, "cpu",
kCssPreserveURLs,
Expand Down
24 changes: 24 additions & 0 deletions net/instaweb/rewriter/server_context.cc
Expand Up @@ -884,10 +884,34 @@ void ServerContext::ShutDownDrivers() {
if (RunningOnValgrind()) {
timeout_ms *= 20;
}

// It is possible that there's still a RewriteContext associated which has
// a call scheduled to run, which will drop the last reference and bring
// the reference count to 0. That will cause SignalIfRequired to DCHECK if
// there is a pending BoundedWaitFor on the driver. To avoid that, add a
// user-reference here to pin the driver. Note that in this case, there will
// be no user-references left on the driver originally.
active->AddUserReference();
active->BoundedWaitFor(RewriteDriver::kWaitForShutDown, timeout_ms);
active->Cleanup(); // Note: only cleans up if the rewrites are complete.
// TODO(jmarantz): rename RewriteDriver::Cleanup to CleanupIfDone.
}

// It's possible that during the BoundedWaitFor the last reference was dropped
// in which case the active driver should now be contained in the deferred set.
// If it isn't, we need to call Cleanup here. We iterate again here, because
// in the earlier iteration other threads may be finishing up drivers and thus
// accessing the deferred set.
for (RewriteDriverSet::iterator i = active_rewrite_drivers_.begin();
i != active_rewrite_drivers_.end(); ++i) {
RewriteDriver* driver = *i;
if (deferred_release_rewrite_drivers_.find(driver)
== deferred_release_rewrite_drivers_.end()) {
driver->Cleanup();
DCHECK(deferred_release_rewrite_drivers_.find(driver)
!= deferred_release_rewrite_drivers_.end());
}
}
}

size_t ServerContext::num_active_rewrite_drivers() {
Expand Down
2 changes: 1 addition & 1 deletion pagespeed/automatic/Makefile
Expand Up @@ -24,7 +24,7 @@
#
# When running this Makefile from the 'automatic' directory then it will
# be set automatically.
MOD_PAGESPEED_ROOT = $(shell cd ../../..; pwd)
MOD_PAGESPEED_ROOT = $(shell cd ../..; pwd)

# OUTPUT_ROOT should be set to wherever you want to put output files. Default
# is to put them in the current directory.
Expand Down
43 changes: 39 additions & 4 deletions pagespeed/automatic/proxy_fetch.cc
Expand Up @@ -77,7 +77,8 @@ ProxyFetchFactory::ProxyFetchFactory(ServerContext* server_context)
timer_(server_context->timer()),
handler_(server_context->message_handler()),
outstanding_proxy_fetches_mutex_(
server_context->thread_system()->NewMutex()) {
server_context->thread_system()->NewMutex()),
proxy_fetches_done_in_flight_(0) {
}

ProxyFetchFactory::~ProxyFetchFactory() {
Expand All @@ -90,6 +91,37 @@ ProxyFetchFactory::~ProxyFetchFactory() {
<< " outstanding requests.";
}

void ProxyFetchFactory::CancelOutstanding() {
int64 sleep_us = 250;

// First wait any current scheduled CompleteFinishParse calls to round up,
// so we don't have to worry about races w/regard to done_outstanding_ and
// finishing_, avoiding the need to take a lock on proxy_fetch.
while (proxy_fetches_done_in_flight_.value() != 0) {
timer_->SleepUs(sleep_us);
}

// Any outstanding fetches left do not have a Done() call initiated on them.
// So we don't have to worry about done_outstanding_ and finishing_
while (true) {
ProxyFetch* fetch;
{
ScopedMutex lock(outstanding_proxy_fetches_mutex_.get());
if (outstanding_proxy_fetches_.empty()) {
break;
}
fetch = *outstanding_proxy_fetches_.begin();
outstanding_proxy_fetches_.erase(fetch);
}
fetch->Done(false);
}

// Wait for any deferred finalization to round up before exiting, releasing
// any associated drivers.
while (proxy_fetches_done_in_flight_.value() != 0) {
timer_->SleepUs(sleep_us); }
}

ProxyFetch* ProxyFetchFactory::CreateNewProxyFetch(
const GoogleString& url_in, AsyncFetch* async_fetch,
RewriteDriver* driver,
Expand Down Expand Up @@ -954,7 +986,7 @@ bool ProxyFetch::HandleFlush(MessageHandler* message_handler) {
// in ExecuteQueued. Note that this can re-order Flushes behind
// pending text, and aggregate together multiple flushes received from
// the network into one.
if (Options()->flush_html()) {
if (Options()->flush_html() || Options()->follow_flushes()) {
ScopedMutex lock(mutex_.get());
network_flush_outstanding_ = true;
ScheduleQueueExecutionIfNeeded();
Expand All @@ -966,6 +998,7 @@ bool ProxyFetch::HandleFlush(MessageHandler* message_handler) {
}

void ProxyFetch::HandleDone(bool success) {
factory_->proxy_fetches_done_in_flight_.BarrierIncrement(1);
// TODO(jmarantz): check if the server is being shut down and punt,
// possibly by calling Finish(false).
if (original_content_fetch_ != NULL) {
Expand Down Expand Up @@ -1028,8 +1061,8 @@ void ProxyFetch::ExecuteQueued() {
bool do_flush = false;
bool do_finish = false;
bool done_result = false;
bool force_flush = false;

bool force_flush = network_flush_outstanding_
&& Options()->follow_flushes();
size_t buffer_limit = Options()->flush_buffer_limit_bytes();
StringStarVector v;
{
Expand Down Expand Up @@ -1171,7 +1204,9 @@ void ProxyFetch::Finish(bool success) {
// indicates the test functionality is complete. In other contexts
// this is a no-op.
ThreadSynchronizer* sync = server_context_->thread_synchronizer();
ProxyFetchFactory* tmp = factory_;
delete this;
tmp->proxy_fetches_done_in_flight_.BarrierIncrement(-1);
sync->Signal(kHeadersSetupRaceDone);
}

Expand Down
8 changes: 8 additions & 0 deletions pagespeed/automatic/proxy_fetch.h
Expand Up @@ -89,6 +89,11 @@ class ProxyFetchFactory {
ProxyFetchPropertyCallbackCollector* property_callback,
AsyncFetch* original_content_fetch);

// Calls Done(false) on all outstanding ProxyFetch instances, and waits for
// those to round up. Meant to be called before shutting down the rewrite
// driver factory when the server doesn't give a chance to do so otherwise.
void CancelOutstanding();

// Initiates the PropertyCache lookup. See ngx_pagespeed.cc or
// proxy_interface.cc for example usage.
static ProxyFetchPropertyCallbackCollector* InitiatePropertyCacheLookup(
Expand Down Expand Up @@ -117,6 +122,9 @@ class ProxyFetchFactory {

scoped_ptr<AbstractMutex> outstanding_proxy_fetches_mutex_;
std::set<ProxyFetch*> outstanding_proxy_fetches_;
// Tracks the number of ProxyFetch instances that have Done() called but have
// not been destructed yet.
AtomicInt32 proxy_fetches_done_in_flight_;

DISALLOW_COPY_AND_ASSIGN(ProxyFetchFactory);
};
Expand Down

0 comments on commit 66cf2f9

Please sign in to comment.