diff --git a/net/instaweb/test.gyp b/net/instaweb/test.gyp index dad6c4bd4e..617c05d53f 100644 --- a/net/instaweb/test.gyp +++ b/net/instaweb/test.gyp @@ -406,6 +406,7 @@ '<(DEPTH)/third_party/apr/apr.gyp:apr', '<(DEPTH)/third_party/aprutil/aprutil.gyp:aprutil', '<(DEPTH)/third_party/httpd/httpd.gyp:include', + '<(DEPTH)/pagespeed/kernel.gyp:tcp_server_thread_for_testing', ], 'include_dirs': [ '<(DEPTH)/third_party/protobuf/src', diff --git a/pagespeed/kernel.gyp b/pagespeed/kernel.gyp index c0427f3759..94a3f35d65 100644 --- a/pagespeed/kernel.gyp +++ b/pagespeed/kernel.gyp @@ -440,6 +440,20 @@ '<(DEPTH)/third_party/gflags/gflags.gyp:gflags', ], }, + { + 'target_name': 'tcp_server_thread_for_testing', + 'type': '<(library)', + 'sources': [ + 'system/tcp_server_thread_for_testing.cc', + ], + 'include_dirs': [ + '<(DEPTH)', + ], + 'dependencies': [ + 'pagespeed_base', + '<(DEPTH)/third_party/apr/apr.gyp:apr', + ], + }, { 'target_name': 'pagespeed_image_processing', 'type': '<(library)', diff --git a/pagespeed/system/system_caches_test.cc b/pagespeed/system/system_caches_test.cc index 86deff060a..4489816d02 100644 --- a/pagespeed/system/system_caches_test.cc +++ b/pagespeed/system/system_caches_test.cc @@ -19,16 +19,16 @@ #include "pagespeed/system/system_caches.h" -#include #include #include #include #include +#include "apr_network_io.h" #include "apr_poll.h" -#include "apr_version.h" #include "apr_pools.h" #include "apr_thread_proc.h" +#include "apr_version.h" #include "base/logging.h" #include "net/instaweb/http/public/async_fetch.h" #include "net/instaweb/http/public/http_cache.h" @@ -49,9 +49,7 @@ #include "net/instaweb/util/public/cache_property_store.h" #include "net/instaweb/util/public/property_cache.h" #include "net/instaweb/util/public/property_store.h" -#include "pagespeed/kernel/base/abstract_mutex.h" #include "pagespeed/kernel/base/abstract_shared_mem.h" -#include "pagespeed/kernel/base/condvar.h" #include "pagespeed/kernel/base/gtest.h" #include "pagespeed/kernel/base/md5_hasher.h" #include "pagespeed/kernel/base/mem_file_system.h" @@ -66,7 +64,6 @@ #include "pagespeed/kernel/base/stack_buffer.h" #include "pagespeed/kernel/base/statistics.h" #include "pagespeed/kernel/base/stl_util.h" -#include "pagespeed/kernel/base/thread.h" #include "pagespeed/kernel/base/thread_system.h" #include "pagespeed/kernel/base/timer.h" #include "pagespeed/kernel/cache/async_cache.h" @@ -88,6 +85,7 @@ #include "pagespeed/kernel/util/file_system_lock_manager.h" #include "pagespeed/kernel/util/platform.h" #include "pagespeed/kernel/util/simple_random.h" +#include "pagespeed/system/tcp_server_thread_for_testing.h" namespace net_instaweb { @@ -142,18 +140,9 @@ class SystemCachesTest : public CustomRewriteTestBase { thread_system_.get()); } - apr_size_t get_fake_memcached_port() { - return fake_memcached_port_; - } - - void set_fake_memcached_port(apr_size_t port) { - fake_memcached_port_ = port; - } - protected: static const int kThreadLimit = 3; static const int kUsableMetadataCacheSize = 8 * 1024; - apr_size_t fake_memcached_port_; // Helper that blocks for async cache lookups. class BlockingCallback : public CacheInterface::Callback { @@ -222,70 +211,28 @@ class SystemCachesTest : public CustomRewriteTestBase { GoogleString value_; }; - class FakeMemcacheServerThread : public ThreadSystem::Thread { + class FakeMemcacheServerThread : public TcpServerThreadForTesting { public: - FakeMemcacheServerThread(SystemCachesTest* owner) - : Thread(owner->thread_system_.get(), "fake_memcache", - ThreadSystem::kJoinable), - owner_(owner), - mutex_(owner->thread_system_->NewMutex()), - notify_port_(mutex_->NewCondvar()) {} - virtual void Run() { + FakeMemcacheServerThread(ThreadSystem* thread_system) + : TcpServerThreadForTesting(GetDesiredListenPort(), "fake_memcache", + thread_system) {} + virtual ~FakeMemcacheServerThread() {} + + void HandleClientConnection(apr_socket_t* sock) override { static const char kMessage[] = "blah\n"; apr_size_t message_size = STATIC_STRLEN(kMessage); char buf[kStackBufferSize]; apr_size_t size = sizeof(buf) - 1; - apr_socket_t* accepted_socket = NULL; - apr_status_t status = apr_pool_create(&pool_, NULL); - ASSERT_EQ(APR_SUCCESS, status); - CreateSocketHelper(); - // Bind to an open port randomly. - // Blocks on connection from main thread. - apr_socket_bind(listening_socket_, sock_addr_); - apr_socket_listen(listening_socket_, SOMAXCONN); - apr_socket_accept(&accepted_socket, listening_socket_, pool_); - apr_socket_recv(accepted_socket, buf, &size); - apr_socket_send(accepted_socket, kMessage, &message_size); - apr_pool_destroy(pool_); - } - - void WaitForReady() { - ScopedMutex hold_lock(mutex_.get()); - while (owner_->get_fake_memcached_port() == 0) { - notify_port_->Wait(); - } + apr_socket_recv(sock, buf, &size); + apr_socket_send(sock, kMessage, &message_size); + apr_socket_close(sock); } private: - void CreateSocketHelper() { - ScopedMutex hold_lock(mutex_.get()); - SimpleRandom simple_random(owner_->thread_system_.get()->NewMutex()); - apr_size_t port_num; - const apr_int32_t kFamily = APR_INET; - sock_addr_ = NULL; - listening_socket_ = NULL; - apr_status_t status; - // Bind to an open port randomly. - do { - // Set port_num to 1024 - 65535 - port_num = (simple_random.Next() % 64511) + 1024; - status = - apr_sockaddr_info_get(&sock_addr_, 0, kFamily, port_num, 0, pool_); - if (status == APR_SUCCESS) { - status = apr_socket_create(&listening_socket_, sock_addr_->family, - SOCK_STREAM, APR_PROTO_TCP, pool_); - } - } while (status != APR_SUCCESS); - owner_->set_fake_memcached_port(port_num); - notify_port_->Signal(); + static int GetDesiredListenPort() { + return desired_listen_port_; } - - SystemCachesTest* owner_; - scoped_ptr mutex_; - scoped_ptr notify_port_; - apr_pool_t* pool_; - apr_sockaddr_t* sock_addr_; - apr_socket_t* listening_socket_; + static int desired_listen_port_; }; SystemCachesTest() @@ -562,6 +509,9 @@ class SystemCachesTest : public CustomRewriteTestBase { GoogleString server_spec_; // Set lazily by MemCachedServerSpec() }; +int SystemCachesTest::FakeMemcacheServerThread::desired_listen_port_ = 0; + + TEST_F(SystemCachesTest, BasicFileAndLruCache) { options_->set_file_cache_path(kCachePath); options_->set_use_shared_mem_locking(false); @@ -1104,14 +1054,11 @@ TEST_F(SystemCachesTest, HangingMultigetTest) { // Test that we do not hang in the case of corrupted responses from memcached, // as seen in bug report 1048 // https://github.com/pagespeed/mod_pagespeed/issues/1048 - set_fake_memcached_port(0); scoped_ptr thread( - new FakeMemcacheServerThread(this)); + new FakeMemcacheServerThread(thread_system_.get())); ASSERT_TRUE(thread->Start()); - thread->WaitForReady(); - ASSERT_NE(get_fake_memcached_port(), 0); - GoogleString apr_str = - StrCat("localhost:", Integer64ToString(get_fake_memcached_port())); + apr_port_t port = thread->GetListeningPort(); + GoogleString apr_str = StrCat("localhost:", Integer64ToString(port)); AprMemCache* cache = system_caches_->NewAprMemCache(apr_str); static const char k1[] = "hello"; static const char k2[] = "hi"; @@ -1137,7 +1084,8 @@ TEST_F(SystemCachesTest, HangingMultigetTest) { close(err_pipe[1]); // Make the multiget request. cache->MultiGet(request); - thread->Join(); + cb1.Block(); + cb2.Block(); fflush(stderr); int bytes_read = read(err_pipe[0], buffer, sizeof(buffer)); ASSERT_NE(-1, bytes_read); diff --git a/pagespeed/system/tcp_server_thread_for_testing.cc b/pagespeed/system/tcp_server_thread_for_testing.cc new file mode 100644 index 0000000000..cbafe68086 --- /dev/null +++ b/pagespeed/system/tcp_server_thread_for_testing.cc @@ -0,0 +1,103 @@ +// Copyright 2016 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: cheesy@google.com (cheesy@google.com) + +#include "pagespeed/system/tcp_server_thread_for_testing.h" + +#include + +#include "apr_network_io.h" +#include "base/logging.h" +#include "pagespeed/kernel/base/abstract_mutex.h" +#include "pagespeed/system/apr_thread_compatible_pool.h" + +namespace net_instaweb { + +TcpServerThreadForTesting::TcpServerThreadForTesting( + apr_port_t listen_port, StringPiece name, ThreadSystem* thread_system) + : Thread(thread_system, name, ThreadSystem::kJoinable), + mutex_(thread_system->NewMutex()), + ready_notify_(mutex_->NewCondvar()), + pool_(AprCreateThreadCompatiblePool(nullptr)), + requested_listen_port_(listen_port), + actual_listening_port_(0) { +} + +TcpServerThreadForTesting::~TcpServerThreadForTesting() { + this->Join(); + if (pool_ != nullptr) { + apr_pool_destroy(pool_); + } +} + +void TcpServerThreadForTesting::Run() { + apr_socket_t* listen_sock = CreateAndBindSocket(); + apr_socket_t* accepted_socket; + apr_status_t status = apr_socket_accept(&accepted_socket, listen_sock, pool_); + CHECK_EQ(status, APR_SUCCESS) + << "TcpServerThreadForTesting apr_socket_accept"; + HandleClientConnection(accepted_socket); + apr_socket_close(listen_sock); +} + +apr_port_t TcpServerThreadForTesting::GetListeningPort() { + ScopedMutex lock(mutex_.get()); + while (actual_listening_port_ == 0) { + ready_notify_->Wait(); + } + return actual_listening_port_; +} + +apr_socket_t* TcpServerThreadForTesting::CreateAndBindSocket() { + apr_socket_t* sock; + // Create TCP socket with SO_REUSEADDR. + apr_status_t status = + apr_socket_create(&sock, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool_); + CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_create"; + status = apr_socket_opt_set(sock, APR_SO_REUSEADDR, 1); + CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_opt_set"; + + // requested_listen_port_ may be zero, in which case apr_socket_bind will + // pick a port for us. + apr_sockaddr_t* sa; + status = apr_sockaddr_info_get(&sa, "127.0.0.1", APR_INET, + requested_listen_port_, 0 /* flags */, pool_); + CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_sockaddr_info_get"; + + // bind and listen. + status = apr_socket_bind(sock, sa); + CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_bind"; + status = apr_socket_listen(sock, 1 /* backlog */); + CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_listen"; + + // Now the socket is bound and listening, find the local port we're actually + // using. If requested_listen_port_ is non-zero, they really should match. + apr_sockaddr_t* bound_sa; + status = apr_socket_addr_get(&bound_sa, APR_LOCAL, sock); + CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_addr_get"; + if (requested_listen_port_ != 0) { + CHECK_EQ(requested_listen_port_, bound_sa->port); + } + + { + ScopedMutex lock(mutex_.get()); + actual_listening_port_ = bound_sa->port; + ready_notify_->Broadcast(); + } + + return sock; +} + +} // namespace net_instaweb diff --git a/pagespeed/system/tcp_server_thread_for_testing.h b/pagespeed/system/tcp_server_thread_for_testing.h new file mode 100644 index 0000000000..7db9d36660 --- /dev/null +++ b/pagespeed/system/tcp_server_thread_for_testing.h @@ -0,0 +1,69 @@ +// Copyright 2016 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Author: cheesy@google.com (Steve Hill) + +#ifndef PAGESPEED_KERNEL_BASE_TCP_SERVER_THREAD_FOR_TESTING_H_ +#define PAGESPEED_KERNEL_BASE_TCP_SERVER_THREAD_FOR_TESTING_H_ + +#include "apr_pools.h" +#include "apr_network_io.h" +#include "pagespeed/kernel/base/condvar.h" +#include "pagespeed/kernel/base/scoped_ptr.h" +#include "pagespeed/kernel/base/string_util.h" +#include "pagespeed/kernel/base/thread.h" +#include "pagespeed/kernel/base/thread_annotations.h" +#include "pagespeed/kernel/base/thread_system.h" + +namespace net_instaweb { + +// Implementation of Thread that uses APR to listen on a TCP port and then +// delegates to a virtual function to handle single connection. This code is +// absolutely not suitable for use outside of tests. + +class TcpServerThreadForTesting : public ThreadSystem::Thread { + public: + // listen_port may be 0, in which case the system will pick the port. + // The socket isn't actually created until the thread is Start()ed. + TcpServerThreadForTesting(apr_port_t listen_port, StringPiece thread_name, + ThreadSystem* thread_system); + // Calls this->Join() which will hang if a connection has not been handled. + virtual ~TcpServerThreadForTesting(); + + // Wait for thread to successfully start listening and then return the actual + // bound port number, which will be bound to IPv4 localhost. + apr_port_t GetListeningPort(); + + private: + // Called after a successful call to apr_accept. Implementor must close + // sock. + virtual void HandleClientConnection(apr_socket_t* sock) = 0; + + // Returns a socket bound to requested_listen_port_ if non-zero, otherwise + // whatever the system picked. Updates actual_listening_port_. + apr_socket_t* CreateAndBindSocket(); + + // Thread implementation. + void Run() override; + + scoped_ptr mutex_; + scoped_ptr ready_notify_; + apr_pool_t* pool_; + const apr_port_t requested_listen_port_; + apr_port_t actual_listening_port_ GUARDED_BY(mutex_); +}; + +} // namespace net_instaweb + +#endif // PAGESPEED_KERNEL_BASE_TCP_SERVER_THREAD_FOR_TESTING_H_