Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Commit

Permalink
Create TcpServerThreadForTesting and re-factor FakeMemcacheServerThre…
Browse files Browse the repository at this point in the history
…ad to use it.
  • Loading branch information
hillsp committed Jun 21, 2016
1 parent eafbee0 commit 66d8196
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 76 deletions.
1 change: 1 addition & 0 deletions net/instaweb/test.gyp
Expand Up @@ -406,6 +406,7 @@
'<(DEPTH)/third_party/apr/apr.gyp:apr',
'<(DEPTH)/third_party/aprutil/aprutil.gyp:aprutil',
'<(DEPTH)/third_party/httpd/httpd.gyp:include',
'<(DEPTH)/pagespeed/kernel.gyp:tcp_server_thread_for_testing',
],
'include_dirs': [
'<(DEPTH)/third_party/protobuf/src',
Expand Down
14 changes: 14 additions & 0 deletions pagespeed/kernel.gyp
Expand Up @@ -440,6 +440,20 @@
'<(DEPTH)/third_party/gflags/gflags.gyp:gflags',
],
},
{
'target_name': 'tcp_server_thread_for_testing',
'type': '<(library)',
'sources': [
'system/tcp_server_thread_for_testing.cc',
],
'include_dirs': [
'<(DEPTH)',
],
'dependencies': [
'pagespeed_base',
'<(DEPTH)/third_party/apr/apr.gyp:apr',
],
},
{
'target_name': 'pagespeed_image_processing',
'type': '<(library)',
Expand Down
100 changes: 24 additions & 76 deletions pagespeed/system/system_caches_test.cc
Expand Up @@ -19,16 +19,16 @@

#include "pagespeed/system/system_caches.h"

#include <sys/socket.h>
#include <cstdlib>
#include <cstdio>
#include <unistd.h>
#include <vector>

#include "apr_network_io.h"
#include "apr_poll.h"
#include "apr_version.h"
#include "apr_pools.h"
#include "apr_thread_proc.h"
#include "apr_version.h"
#include "base/logging.h"
#include "net/instaweb/http/public/async_fetch.h"
#include "net/instaweb/http/public/http_cache.h"
Expand All @@ -49,9 +49,7 @@
#include "net/instaweb/util/public/cache_property_store.h"
#include "net/instaweb/util/public/property_cache.h"
#include "net/instaweb/util/public/property_store.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/abstract_shared_mem.h"
#include "pagespeed/kernel/base/condvar.h"
#include "pagespeed/kernel/base/gtest.h"
#include "pagespeed/kernel/base/md5_hasher.h"
#include "pagespeed/kernel/base/mem_file_system.h"
Expand All @@ -66,7 +64,6 @@
#include "pagespeed/kernel/base/stack_buffer.h"
#include "pagespeed/kernel/base/statistics.h"
#include "pagespeed/kernel/base/stl_util.h"
#include "pagespeed/kernel/base/thread.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/base/timer.h"
#include "pagespeed/kernel/cache/async_cache.h"
Expand All @@ -88,6 +85,7 @@
#include "pagespeed/kernel/util/file_system_lock_manager.h"
#include "pagespeed/kernel/util/platform.h"
#include "pagespeed/kernel/util/simple_random.h"
#include "pagespeed/system/tcp_server_thread_for_testing.h"

namespace net_instaweb {

Expand Down Expand Up @@ -142,18 +140,9 @@ class SystemCachesTest : public CustomRewriteTestBase<SystemRewriteOptions> {
thread_system_.get());
}

apr_size_t get_fake_memcached_port() {
return fake_memcached_port_;
}

void set_fake_memcached_port(apr_size_t port) {
fake_memcached_port_ = port;
}

protected:
static const int kThreadLimit = 3;
static const int kUsableMetadataCacheSize = 8 * 1024;
apr_size_t fake_memcached_port_;

// Helper that blocks for async cache lookups.
class BlockingCallback : public CacheInterface::Callback {
Expand Down Expand Up @@ -222,70 +211,28 @@ class SystemCachesTest : public CustomRewriteTestBase<SystemRewriteOptions> {
GoogleString value_;
};

class FakeMemcacheServerThread : public ThreadSystem::Thread {
class FakeMemcacheServerThread : public TcpServerThreadForTesting {
public:
FakeMemcacheServerThread(SystemCachesTest* owner)
: Thread(owner->thread_system_.get(), "fake_memcache",
ThreadSystem::kJoinable),
owner_(owner),
mutex_(owner->thread_system_->NewMutex()),
notify_port_(mutex_->NewCondvar()) {}
virtual void Run() {
FakeMemcacheServerThread(ThreadSystem* thread_system)
: TcpServerThreadForTesting(GetDesiredListenPort(), "fake_memcache",
thread_system) {}
virtual ~FakeMemcacheServerThread() {}

void HandleClientConnection(apr_socket_t* sock) override {
static const char kMessage[] = "blah\n";
apr_size_t message_size = STATIC_STRLEN(kMessage);
char buf[kStackBufferSize];
apr_size_t size = sizeof(buf) - 1;
apr_socket_t* accepted_socket = NULL;
apr_status_t status = apr_pool_create(&pool_, NULL);
ASSERT_EQ(APR_SUCCESS, status);
CreateSocketHelper();
// Bind to an open port randomly.
// Blocks on connection from main thread.
apr_socket_bind(listening_socket_, sock_addr_);
apr_socket_listen(listening_socket_, SOMAXCONN);
apr_socket_accept(&accepted_socket, listening_socket_, pool_);
apr_socket_recv(accepted_socket, buf, &size);
apr_socket_send(accepted_socket, kMessage, &message_size);
apr_pool_destroy(pool_);
}

void WaitForReady() {
ScopedMutex hold_lock(mutex_.get());
while (owner_->get_fake_memcached_port() == 0) {
notify_port_->Wait();
}
apr_socket_recv(sock, buf, &size);
apr_socket_send(sock, kMessage, &message_size);
apr_socket_close(sock);
}

private:
void CreateSocketHelper() {
ScopedMutex hold_lock(mutex_.get());
SimpleRandom simple_random(owner_->thread_system_.get()->NewMutex());
apr_size_t port_num;
const apr_int32_t kFamily = APR_INET;
sock_addr_ = NULL;
listening_socket_ = NULL;
apr_status_t status;
// Bind to an open port randomly.
do {
// Set port_num to 1024 - 65535
port_num = (simple_random.Next() % 64511) + 1024;
status =
apr_sockaddr_info_get(&sock_addr_, 0, kFamily, port_num, 0, pool_);
if (status == APR_SUCCESS) {
status = apr_socket_create(&listening_socket_, sock_addr_->family,
SOCK_STREAM, APR_PROTO_TCP, pool_);
}
} while (status != APR_SUCCESS);
owner_->set_fake_memcached_port(port_num);
notify_port_->Signal();
static int GetDesiredListenPort() {
return desired_listen_port_;
}

SystemCachesTest* owner_;
scoped_ptr<ThreadSystem::CondvarCapableMutex> mutex_;
scoped_ptr<ThreadSystem::Condvar> notify_port_;
apr_pool_t* pool_;
apr_sockaddr_t* sock_addr_;
apr_socket_t* listening_socket_;
static int desired_listen_port_;
};

SystemCachesTest()
Expand Down Expand Up @@ -562,6 +509,9 @@ class SystemCachesTest : public CustomRewriteTestBase<SystemRewriteOptions> {
GoogleString server_spec_; // Set lazily by MemCachedServerSpec()
};

int SystemCachesTest::FakeMemcacheServerThread::desired_listen_port_ = 0;


TEST_F(SystemCachesTest, BasicFileAndLruCache) {
options_->set_file_cache_path(kCachePath);
options_->set_use_shared_mem_locking(false);
Expand Down Expand Up @@ -1104,14 +1054,11 @@ TEST_F(SystemCachesTest, HangingMultigetTest) {
// Test that we do not hang in the case of corrupted responses from memcached,
// as seen in bug report 1048
// https://github.com/pagespeed/mod_pagespeed/issues/1048
set_fake_memcached_port(0);
scoped_ptr<FakeMemcacheServerThread> thread(
new FakeMemcacheServerThread(this));
new FakeMemcacheServerThread(thread_system_.get()));
ASSERT_TRUE(thread->Start());
thread->WaitForReady();
ASSERT_NE(get_fake_memcached_port(), 0);
GoogleString apr_str =
StrCat("localhost:", Integer64ToString(get_fake_memcached_port()));
apr_port_t port = thread->GetListeningPort();
GoogleString apr_str = StrCat("localhost:", Integer64ToString(port));
AprMemCache* cache = system_caches_->NewAprMemCache(apr_str);
static const char k1[] = "hello";
static const char k2[] = "hi";
Expand All @@ -1137,7 +1084,8 @@ TEST_F(SystemCachesTest, HangingMultigetTest) {
close(err_pipe[1]);
// Make the multiget request.
cache->MultiGet(request);
thread->Join();
cb1.Block();
cb2.Block();
fflush(stderr);
int bytes_read = read(err_pipe[0], buffer, sizeof(buffer));
ASSERT_NE(-1, bytes_read);
Expand Down
103 changes: 103 additions & 0 deletions pagespeed/system/tcp_server_thread_for_testing.cc
@@ -0,0 +1,103 @@
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: cheesy@google.com (cheesy@google.com)

#include "pagespeed/system/tcp_server_thread_for_testing.h"

#include <sys/socket.h>

#include "apr_network_io.h"
#include "base/logging.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/system/apr_thread_compatible_pool.h"

namespace net_instaweb {

TcpServerThreadForTesting::TcpServerThreadForTesting(
apr_port_t listen_port, StringPiece name, ThreadSystem* thread_system)
: Thread(thread_system, name, ThreadSystem::kJoinable),
mutex_(thread_system->NewMutex()),
ready_notify_(mutex_->NewCondvar()),
pool_(AprCreateThreadCompatiblePool(nullptr)),
requested_listen_port_(listen_port),
actual_listening_port_(0) {
}

TcpServerThreadForTesting::~TcpServerThreadForTesting() {
this->Join();
if (pool_ != nullptr) {
apr_pool_destroy(pool_);
}
}

void TcpServerThreadForTesting::Run() {
apr_socket_t* listen_sock = CreateAndBindSocket();
apr_socket_t* accepted_socket;
apr_status_t status = apr_socket_accept(&accepted_socket, listen_sock, pool_);
CHECK_EQ(status, APR_SUCCESS)
<< "TcpServerThreadForTesting apr_socket_accept";
HandleClientConnection(accepted_socket);
apr_socket_close(listen_sock);
}

apr_port_t TcpServerThreadForTesting::GetListeningPort() {
ScopedMutex lock(mutex_.get());
while (actual_listening_port_ == 0) {
ready_notify_->Wait();
}
return actual_listening_port_;
}

apr_socket_t* TcpServerThreadForTesting::CreateAndBindSocket() {
apr_socket_t* sock;
// Create TCP socket with SO_REUSEADDR.
apr_status_t status =
apr_socket_create(&sock, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool_);
CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_create";
status = apr_socket_opt_set(sock, APR_SO_REUSEADDR, 1);
CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_opt_set";

// requested_listen_port_ may be zero, in which case apr_socket_bind will
// pick a port for us.
apr_sockaddr_t* sa;
status = apr_sockaddr_info_get(&sa, "127.0.0.1", APR_INET,
requested_listen_port_, 0 /* flags */, pool_);
CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_sockaddr_info_get";

// bind and listen.
status = apr_socket_bind(sock, sa);
CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_bind";
status = apr_socket_listen(sock, 1 /* backlog */);
CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_listen";

// Now the socket is bound and listening, find the local port we're actually
// using. If requested_listen_port_ is non-zero, they really should match.
apr_sockaddr_t* bound_sa;
status = apr_socket_addr_get(&bound_sa, APR_LOCAL, sock);
CHECK_EQ(status, APR_SUCCESS) << "CreateAndBindSocket apr_socket_addr_get";
if (requested_listen_port_ != 0) {
CHECK_EQ(requested_listen_port_, bound_sa->port);
}

{
ScopedMutex lock(mutex_.get());
actual_listening_port_ = bound_sa->port;
ready_notify_->Broadcast();
}

return sock;
}

} // namespace net_instaweb
69 changes: 69 additions & 0 deletions pagespeed/system/tcp_server_thread_for_testing.h
@@ -0,0 +1,69 @@
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: cheesy@google.com (Steve Hill)

#ifndef PAGESPEED_KERNEL_BASE_TCP_SERVER_THREAD_FOR_TESTING_H_
#define PAGESPEED_KERNEL_BASE_TCP_SERVER_THREAD_FOR_TESTING_H_

#include "apr_pools.h"
#include "apr_network_io.h"
#include "pagespeed/kernel/base/condvar.h"
#include "pagespeed/kernel/base/scoped_ptr.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/kernel/base/thread.h"
#include "pagespeed/kernel/base/thread_annotations.h"
#include "pagespeed/kernel/base/thread_system.h"

namespace net_instaweb {

// Implementation of Thread that uses APR to listen on a TCP port and then
// delegates to a virtual function to handle single connection. This code is
// absolutely not suitable for use outside of tests.

class TcpServerThreadForTesting : public ThreadSystem::Thread {
public:
// listen_port may be 0, in which case the system will pick the port.
// The socket isn't actually created until the thread is Start()ed.
TcpServerThreadForTesting(apr_port_t listen_port, StringPiece thread_name,
ThreadSystem* thread_system);
// Calls this->Join() which will hang if a connection has not been handled.
virtual ~TcpServerThreadForTesting();

// Wait for thread to successfully start listening and then return the actual
// bound port number, which will be bound to IPv4 localhost.
apr_port_t GetListeningPort();

private:
// Called after a successful call to apr_accept. Implementor must close
// sock.
virtual void HandleClientConnection(apr_socket_t* sock) = 0;

// Returns a socket bound to requested_listen_port_ if non-zero, otherwise
// whatever the system picked. Updates actual_listening_port_.
apr_socket_t* CreateAndBindSocket();

// Thread implementation.
void Run() override;

scoped_ptr<ThreadSystem::CondvarCapableMutex> mutex_;
scoped_ptr<ThreadSystem::Condvar> ready_notify_;
apr_pool_t* pool_;
const apr_port_t requested_listen_port_;
apr_port_t actual_listening_port_ GUARDED_BY(mutex_);
};

} // namespace net_instaweb

#endif // PAGESPEED_KERNEL_BASE_TCP_SERVER_THREAD_FOR_TESTING_H_

0 comments on commit 66d8196

Please sign in to comment.