diff --git a/html/doc/system.html b/html/doc/system.html index 7ef009d47a..6904184ef9 100644 --- a/html/doc/system.html +++ b/html/doc/system.html @@ -475,6 +475,18 @@

Configuring Redis

Nginx:
pagespeed RedisReconnectionDelayMs timeout_in_milliseconds;
+

+ You can also select the Redis logical database having the specified numeric index. + This setting defaults to database index 0. + Note that the Redis clustering feature does not allow specifying a database index, + and therefore is incompatible with this setting: +

+
+
Apache:
ModPagespeedRedisDatabaseIndex index
+
Nginx:
pagespeed RedisDatabaseIndex index;
+

Flushing PageSpeed Server-Side Cache

diff --git a/install/debug.conf.template b/install/debug.conf.template index 1ff9e99b64..1aac4a8bb1 100644 --- a/install/debug.conf.template +++ b/install/debug.conf.template @@ -2251,6 +2251,7 @@ AddType application/javascript .js #ALL_DIRECTIVES ModPagespeedRedisServer localhost:55555 #ALL_DIRECTIVES ModPagespeedRedisReconnectionDelayMs 1000 #ALL_DIRECTIVES ModPagespeedRedisTimeoutUs 50000 +#ALL_DIRECTIVES ModPagespeedRedisDatabaseIndex 0 #ALL_DIRECTIVES ModPagespeedReportUnloadTime true #ALL_DIRECTIVES ModPagespeedRespectVary true #ALL_DIRECTIVES ModPagespeedRespectXForwardedProto off diff --git a/pagespeed/system/redis_cache.cc b/pagespeed/system/redis_cache.cc index 46c64557ae..438dd124b6 100644 --- a/pagespeed/system/redis_cache.cc +++ b/pagespeed/system/redis_cache.cc @@ -77,6 +77,8 @@ namespace net_instaweb { // // TODO(yeputons): consider removing limit on amount of redirections. static const int kMaxRedirections = 1; +static const int kDefaultDatabaseIndex = 0; +static const int kRedisDatabaseIndexNotSet = -1; const char kRedisClusterRedirections[] = "redis_cluster_redirections"; const char kRedisClusterSlotsFetches[] = "redis_cluster_slots_fetches"; @@ -84,7 +86,7 @@ const char kRedisClusterSlotsFetches[] = "redis_cluster_slots_fetches"; RedisCache::RedisCache(StringPiece host, int port, ThreadSystem* thread_system, MessageHandler* message_handler, Timer* timer, int64 reconnection_delay_ms, int64 timeout_us, - Statistics* stats) + Statistics* stats, int database_index) : main_host_(host.as_string()), main_port_(port), thread_system_(thread_system), @@ -95,7 +97,8 @@ RedisCache::RedisCache(StringPiece host, int port, ThreadSystem* thread_system, thread_synchronizer_(new ThreadSynchronizer(thread_system)), connections_lock_(thread_system_->NewRWLock()), cluster_map_lock_(thread_system_->NewRWLock()), - main_connection_(nullptr) { + main_connection_(nullptr), + database_index_(database_index) { redirections_ = stats->GetVariable(kRedisClusterRedirections); cluster_slots_fetches_ = stats->GetVariable(kRedisClusterSlotsFetches); } @@ -118,7 +121,7 @@ void RedisCache::StartUp(bool connect_now) { CHECK(connections_.empty()); CHECK(!main_connection_); std::unique_ptr conn( - new Connection(this, main_host_, main_port_)); + new Connection(this, main_host_, main_port_, database_index_)); main_connection_ = conn.get(); connections_.emplace(StrCat(main_host_, ":", IntegerToString(main_port_)), std::move(conn)); @@ -251,7 +254,7 @@ RedisCache::RedisReply RedisCache::RedisCommand( for (redirections = 0; redirections <= kMaxRedirections; redirections++, last_redirecting_connection = conn, - conn = GetOrCreateConnection(redirected_to), + conn = GetOrCreateConnection(redirected_to, kDefaultDatabaseIndex), redirections_->Add(1)) { ScopedMutex lock(conn->GetOperationMutex()); @@ -346,7 +349,7 @@ ExternalServerSpec RedisCache::ParseRedirectionError(StringPiece error) { } RedisCache::Connection* RedisCache::GetOrCreateConnection( - ExternalServerSpec spec) { + ExternalServerSpec spec, const int database_index) { Connection* result; bool should_start_up = false; { @@ -356,7 +359,8 @@ RedisCache::Connection* RedisCache::GetOrCreateConnection( if (it == connections_.end()) { LOG(INFO) << "Initiating connection Redis server at " << spec.ToString(); it = connections_.emplace(name, std::unique_ptr( - new Connection(this, spec.host, spec.port))).first; + new Connection(this, spec.host, spec.port, database_index))) + .first; should_start_up = true; } result = it->second.get(); @@ -453,10 +457,11 @@ void RedisCache::FetchClusterSlotMapping(Connection* connection) { return; } // Everything is there and is the right type. Store it. + // Using database 0 for cluster new_cluster_mappings.push_back(ClusterMapping( start_slot_range->integer, end_slot_range->integer, GetOrCreateConnection(ExternalServerSpec( - master_ip->str, master_port->integer)))); + master_ip->str, master_port->integer), kDefaultDatabaseIndex))); } // Sort new_cluster_mappings based on start_slot_range_. @@ -513,7 +518,7 @@ RedisCache::Connection* RedisCache::LookupConnection(StringPiece key) { } RedisCache::Connection::Connection(RedisCache* redis_cache, StringPiece host, - int port) + int port, int database_index) : redis_cache_(redis_cache), host_(host.as_string()), port_(port), @@ -521,7 +526,8 @@ RedisCache::Connection::Connection(RedisCache* redis_cache, StringPiece host, state_mutex_(redis_cache_->thread_system_->NewMutex()), redis_(nullptr), state_(kShutDown), - next_reconnect_at_ms_(redis_cache_->timer_->NowMs()) {} + next_reconnect_at_ms_(redis_cache_->timer_->NowMs()), + database_index_(database_index) {} void RedisCache::Connection::StartUp(bool connect_now) { CHECK_NE("", host_); @@ -533,7 +539,7 @@ void RedisCache::Connection::StartUp(bool connect_now) { state_ = kDisconnected; } if (connect_now) { - EnsureConnection(); + EnsureConnectionAndDatabaseSelection(); } } @@ -559,6 +565,21 @@ void RedisCache::Connection::ShutDown() { state_ = kShutDown; } +bool RedisCache::Connection::EnsureConnectionAndDatabaseSelection() { + { + ScopedMutex lock(state_mutex_.get()); + if (state_ == kConnected) { + return true; + } + } + + if (!EnsureConnection()) { + return false; + } + + return EnsureDatabaseSelection(); +} + bool RedisCache::Connection::EnsureConnection() { { ScopedMutex lock(state_mutex_.get()); @@ -599,6 +620,21 @@ bool RedisCache::Connection::EnsureConnection() { return state_ == kConnected; } +bool RedisCache::Connection::EnsureDatabaseSelection() { + // dont select database if database index property not specified in config + if (database_index_ != kRedisDatabaseIndexNotSet) { + RedisReply reply = RedisCommand(StrCat("SELECT ", + IntegerToString(database_index_)).c_str(), REDIS_REPLY_STRING); + if (reply == nullptr) { + ScopedMutex lock(state_mutex_.get()); + state_ = kDisconnected; + redis_.reset(); + return false; + } + } + return true; +} + RedisCache::RedisContext RedisCache::Connection::TryConnect() { struct timeval timeout; timeout.tv_sec = redis_cache_->timeout_us_ / Timer::kSecondUs; @@ -655,7 +691,7 @@ void RedisCache::Connection::UpdateState() { RedisCache::RedisReply RedisCache::Connection::RedisCommand(const char* format, va_list args) { - if (!EnsureConnection()) { + if (!EnsureConnectionAndDatabaseSelection()) { return nullptr; } diff --git a/pagespeed/system/redis_cache.h b/pagespeed/system/redis_cache.h index e3524638d4..5d967c1c6b 100644 --- a/pagespeed/system/redis_cache.h +++ b/pagespeed/system/redis_cache.h @@ -80,7 +80,7 @@ class RedisCache : public CacheInterface { RedisCache(StringPiece host, int port, ThreadSystem* thread_system, MessageHandler* message_handler, Timer* timer, int64 reconnection_delay_ms, int64 timeout_us, - Statistics* stats); + Statistics* stats, int database_index); ~RedisCache() override { ShutDown(); } static void InitStats(Statistics* stats); @@ -142,7 +142,8 @@ class RedisCache : public CacheInterface { class Connection { public: - Connection(RedisCache* redis_cache, StringPiece host, int port); + Connection(RedisCache* redis_cache, StringPiece host, int port, + int database_index); void StartUp(bool connect_now = true) LOCKS_EXCLUDED(redis_mutex_, state_mutex_); @@ -179,8 +180,14 @@ class RedisCache : public CacheInterface { bool IsHealthyLockHeld() const EXCLUSIVE_LOCKS_REQUIRED(state_mutex_); void UpdateState() EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_, state_mutex_); + // connects with redis as well as selects redis database + bool EnsureConnectionAndDatabaseSelection() + EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_) + LOCKS_EXCLUDED(state_mutex_); bool EnsureConnection() EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_) LOCKS_EXCLUDED(state_mutex_); + bool EnsureDatabaseSelection() EXCLUSIVE_LOCKS_REQUIRED(state_mutex_) + LOCKS_EXCLUDED(state_mutex_); RedisContext TryConnect() LOCKS_EXCLUDED(redis_mutex_, state_mutex_); @@ -196,6 +203,10 @@ class RedisCache : public CacheInterface { State state_ GUARDED_BY(state_mutex_); int64 next_reconnect_at_ms_ GUARDED_BY(state_mutex_); + // selected database is a property of the connection, + // should re-select it on reconnection + const int database_index_; + DISALLOW_COPY_AND_ASSIGN(Connection); }; typedef std::map> ConnectionsMap; @@ -233,7 +244,8 @@ class RedisCache : public CacheInterface { // Must not be called under Connection::GetOperationLock(), that will cause // lock inversion and potential theoretical deadlock. - Connection* GetOrCreateConnection(ExternalServerSpec spec); + Connection* GetOrCreateConnection(ExternalServerSpec spec, + const int database_index); // Ask redis what keys should go to which servers. void FetchClusterSlotMapping(Connection* connection) @@ -267,6 +279,8 @@ class RedisCache : public CacheInterface { // Not guarded, but should only be modified in StartUp(). Connection* main_connection_; + const int database_index_; + friend class RedisCacheTest; DISALLOW_COPY_AND_ASSIGN(RedisCache); }; diff --git a/pagespeed/system/redis_cache_cluster_test.cc b/pagespeed/system/redis_cache_cluster_test.cc index 37085050bd..ccf61928a7 100644 --- a/pagespeed/system/redis_cache_cluster_test.cc +++ b/pagespeed/system/redis_cache_cluster_test.cc @@ -47,6 +47,7 @@ namespace { static const int kReconnectionDelayMs = 10; static const int kTimeoutUs = 100 * Timer::kMsUs; static const int kSlaveNodesFlushingTimeoutMs = 1000; + static const int kDatabaseIndex = 0; // One can check following constants with CLUSTER KEYSLOT command. // For testing purposes, both KEY and {}KEY should be in the same slot range. @@ -101,7 +102,7 @@ class RedisCacheClusterTest : public CacheTestBase { // Setting up cache. cache_.reset(new RedisCache("localhost", ports_[0], thread_system_.get(), &handler_, &timer_, kReconnectionDelayMs, - kTimeoutUs, &statistics_)); + kTimeoutUs, &statistics_, kDatabaseIndex)); cache_->StartUp(); return true; } diff --git a/pagespeed/system/redis_cache_test.cc b/pagespeed/system/redis_cache_test.cc index e6931081ef..bfe53b15ae 100644 --- a/pagespeed/system/redis_cache_test.cc +++ b/pagespeed/system/redis_cache_test.cc @@ -45,6 +45,7 @@ namespace net_instaweb { namespace { static const int kReconnectionDelayMs = 10; static const int kTimeoutUs = 100 * Timer::kMsUs; + static const int kDatabaseIndex[] = {0, 1}; static const char kSomeKey[] = "SomeKey"; static const char kSomeValue[] = "SomeValue"; } @@ -58,11 +59,12 @@ class RedisCacheTest : public CacheTestBase { RedisCacheTest() : thread_system_(Platform::CreateThreadSystem()), statistics_(thread_system_.get()), - timer_(new NullMutex, 0) { + timer_(new NullMutex, 0), + redis_port_env_(0) { RedisCache::InitStats(&statistics_); } - bool InitRedisOrSkip() { + bool PrepareRedisOrSkip() { const char* portString = getenv("REDIS_PORT"); int port; if (portString == nullptr || !StringToInt(portString, &port)) { @@ -73,6 +75,8 @@ class RedisCacheTest : public CacheTestBase { return false; } + redis_port_env_ = port; + { TcpConnectionForTesting conn; CHECK(conn.Connect("localhost", port)) @@ -80,28 +84,31 @@ class RedisCacheTest : public CacheTestBase { conn.Send("FLUSHALL\r\n"); CHECK_EQ("+OK\r\n", conn.ReadLineCrLf()); } - - cache_.reset(new RedisCache("localhost", port, thread_system_.get(), - &handler_, &timer_, kReconnectionDelayMs, - kTimeoutUs, &statistics_)); - cache_->StartUp(); return true; } + void InitRedisWithCustomDatabaseIndex(const int database_index) { + cache_.emplace_back(new RedisCache("localhost", redis_port_env_, + thread_system_.get(), &handler_, &timer_, + kReconnectionDelayMs, kTimeoutUs, &statistics_, + database_index)); + cache_.back()->StartUp(); + } + void InitRedisWithCustomServer() { - cache_.reset(new RedisCache("localhost", custom_server_port_, + cache_.emplace_back(new RedisCache("localhost", custom_server_port_, thread_system_.get(), &handler_, &timer_, kReconnectionDelayMs, kTimeoutUs, - &statistics_)); + &statistics_, kDatabaseIndex[0])); } void InitRedisWithUnreachableServer() { // Try to connect to some definitely unreachable host. // 192.0.2.0/24 is reserved for documentation purposes in RFC5737 and no // machine should ever be routable in that subnet. - cache_.reset(new RedisCache("192.0.2.1", 12345, thread_system_.get(), + cache_.emplace_back(new RedisCache("192.0.2.1", 12345, thread_system_.get(), &handler_, &timer_, kReconnectionDelayMs, - kTimeoutUs, &statistics_)); + kTimeoutUs, &statistics_, kDatabaseIndex[0])); } static void SetUpTestCase() { @@ -130,13 +137,13 @@ class RedisCacheTest : public CacheTestBase { apr_terminate(); } - CacheInterface* Cache() override { return cache_.get(); } + CacheInterface* Cache() override { return cache_[0].get(); } ThreadSynchronizer* GetThreadSynchronizer() { - return cache_->GetThreadSynchronizerForTesting(); + return cache_[0]->GetThreadSynchronizerForTesting(); } - scoped_ptr cache_; + std::vector> cache_; scoped_ptr thread_system_; SimpleStats statistics_; MockTimer timer_; @@ -144,15 +151,18 @@ class RedisCacheTest : public CacheTestBase { scoped_ptr custom_server_; static apr_port_t custom_server_port_; + int redis_port_env_; }; apr_port_t RedisCacheTest::custom_server_port_ = 0; // Simple flow of putting in an item, getting it, deleting it. TEST_F(RedisCacheTest, PutGetDelete) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); + CheckPut("Name", "Value"); CheckGet("Name", "Value"); CheckNotFound("Another Name"); @@ -165,15 +175,17 @@ TEST_F(RedisCacheTest, PutGetDelete) { // We're not running against redis cluster, so we don't expect to ever be // redirected, and we should never ask for cluster slots. - EXPECT_EQ(0, cache_->Redirections()); - EXPECT_EQ(0, cache_->ClusterSlotsFetches()); + EXPECT_EQ(0, cache_[0]->Redirections()); + EXPECT_EQ(0, cache_[0]->ClusterSlotsFetches()); } // Make sure curly braces in keys aren't treated specially. TEST_F(RedisCacheTest, CurlyBraces) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); + CheckPut("{1}NameA", "Value1A"); CheckPut("{2}NameB", "Value2B"); CheckPut("{2}NameC", "Value2C"); @@ -185,9 +197,11 @@ TEST_F(RedisCacheTest, CurlyBraces) { // And spaces TEST_F(RedisCacheTest, Spaces) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); + CheckPut("1 NameA", "Value1A"); CheckPut("2 NameB", "Value2B"); CheckPut("2 NameC", "Value2C"); @@ -198,16 +212,18 @@ TEST_F(RedisCacheTest, Spaces) { } TEST_F(RedisCacheTest, MultiGet) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); TestMultiGet(); // Test from CacheTestBase is just fine. } TEST_F(RedisCacheTest, BasicInvalid) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); // Check that we honor callback veto on validity. CheckPut("nameA", "valueA"); @@ -220,15 +236,16 @@ TEST_F(RedisCacheTest, BasicInvalid) { } TEST_F(RedisCacheTest, GetStatus) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); GoogleString status; - cache_->GetStatus(&status); + cache_[0]->GetStatus(&status); // Check that some reasonable info is present. - EXPECT_THAT(status, HasSubstr(cache_->ServerDescription())); + EXPECT_THAT(status, HasSubstr(cache_[0]->ServerDescription())); EXPECT_THAT(status, HasSubstr("redis_version:")); EXPECT_THAT(status, HasSubstr("connected_clients:")); EXPECT_THAT(status, HasSubstr("tcp_port:")); @@ -238,23 +255,50 @@ TEST_F(RedisCacheTest, GetStatus) { // Two following tests are identical and ensure that no keys are leaked between // tests through shared running Redis server. TEST_F(RedisCacheTest, TestsAreIsolated1) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); CheckNotFound(kSomeKey); CheckPut(kSomeKey, kSomeValue); } TEST_F(RedisCacheTest, TestsAreIsolated2) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); CheckNotFound(kSomeKey); CheckPut(kSomeKey, kSomeValue); } +// Test to check multiple redis database(with different index) +TEST_F(RedisCacheTest, TestMultipleDatabases) { + if (!PrepareRedisOrSkip()) { + return; + } + InitRedisWithCustomDatabaseIndex(kDatabaseIndex[0]); + InitRedisWithCustomDatabaseIndex(kDatabaseIndex[1]); + + CheckPut("key1", "value1"); + // adding same key to second database + CheckPut(cache_[1].get(), "key1", "value2"); + + // checking key entries from databases + CheckGet("key1", "value1"); + CheckGet(cache_[1].get(), "key1", "value2"); + + CheckDelete("key1"); + + // checking key deleted from first database + CheckNotFound("key1"); + + // check same key present in second database + CheckGet(cache_[1].get(), "key1", "value2"); +} + class RedisGetRespondingServerThread : public TcpServerThreadForTesting { public: RedisGetRespondingServerThread(apr_port_t listen_port, @@ -268,6 +312,26 @@ class RedisGetRespondingServerThread : public TcpServerThreadForTesting { void HandleClientConnection(apr_socket_t* sock) override { // See http://redis.io/topics/protocol for details. Request is an array of // two bulk strings, answer for GET is a single bulk string. + + // during redis cache startup, Select database command is fired + // being the first command, it is captured by the mock redis server + static const char kSelectRequest[] = + "*2\r\n" + "$6\r\nSELECT\r\n" + "$1\r\n0\r\n"; + static const char kSelectAnswer[] = "+OK\r\n"; + apr_size_t answer_size_select = STATIC_STRLEN(kSelectAnswer); + + char requestBuf[STATIC_STRLEN(kSelectRequest) + 1]; + apr_size_t recvSize = sizeof(requestBuf) - 1; + + apr_socket_recv(sock, requestBuf, &recvSize); + EXPECT_EQ(STATIC_STRLEN(kSelectRequest), recvSize); + requestBuf[recvSize] = 0; + EXPECT_STREQ(kSelectRequest, requestBuf); + + apr_socket_send(sock, kSelectAnswer, &answer_size_select); + static const char kRequest[] = "*2\r\n" "$3\r\nGET\r\n" @@ -291,7 +355,7 @@ class RedisGetRespondingServerThread : public TcpServerThreadForTesting { TEST_F(RedisCacheTest, ReconnectsInstantly) { InitRedisWithCustomServer(); ASSERT_TRUE(StartCustomServer()); - cache_->StartUp(); + cache_[0]->StartUp(); CheckGet(kSomeKey, kSomeValue); // Server closes connection after processing one request, but cache does not @@ -312,7 +376,7 @@ TEST_F(RedisCacheTest, ReconnectsInstantly) { TEST_F(RedisCacheTest, ReconnectsUntilSuccessWithTimeout) { InitRedisWithCustomServer(); ASSERT_TRUE(StartCustomServer()); - cache_->StartUp(); + cache_[0]->StartUp(); CheckGet(kSomeKey, kSomeValue); // Server closes connection after processing one request, but cache does not @@ -341,7 +405,7 @@ TEST_F(RedisCacheTest, ReconnectsUntilSuccessWithTimeout) { TEST_F(RedisCacheTest, ReconnectsIfStartUpFailed) { InitRedisWithCustomServer(); - cache_->StartUp(); + cache_[0]->StartUp(); // Client already knows that connection failed. EXPECT_FALSE(Cache()->IsHealthy()); @@ -360,9 +424,10 @@ TEST_F(RedisCacheTest, ReconnectsIfStartUpFailed) { } TEST_F(RedisCacheTest, DoesNotReconnectAfterShutdown) { - if (!InitRedisOrSkip()) { + if (!PrepareRedisOrSkip()) { return; } + InitRedisWithCustomDatabaseIndex(0); CheckPut(kSomeKey, kSomeValue); CheckGet(kSomeKey, kSomeValue); @@ -402,6 +467,53 @@ class RedisNotRespondingServerThread : public TcpServerThreadForTesting { WorkerTestBase::SyncPoint connection_received_; }; +// This server always waits until connection is received to avoid race +// condition between server destruction and accepting connection (like in +// ShutDownDuringConnection). Other servers do not do that because tests +// actually rely on their answers to client. +class RedisNotRespondingOperationTimeoutServerThread : + public TcpServerThreadForTesting { + public: + RedisNotRespondingOperationTimeoutServerThread(apr_port_t listen_port, + ThreadSystem* thread_system) + : TcpServerThreadForTesting(listen_port, "redis_not_responding_server", + thread_system), + connection_received_(thread_system) {} + + ~RedisNotRespondingOperationTimeoutServerThread() { + connection_received_.Wait(); + ShutDown(); + } + + protected: + void HandleClientConnection(apr_socket_t* sock) override { + // Do nothing, socket will be closed in destructor + + // during redis cache startup, Select database command is fired + // being the first command, it is captured by the mock redis server + static const char kSelectRequest[] = + "*2\r\n" + "$6\r\nSELECT\r\n" + "$1\r\n0\r\n"; + static const char kSelectAnswer[] = "+OK\r\n"; + apr_size_t answer_size_select = STATIC_STRLEN(kSelectAnswer); + + char requestBuf[STATIC_STRLEN(kSelectRequest) + 1]; + apr_size_t recvSize = sizeof(requestBuf) - 1; + + apr_socket_recv(sock, requestBuf, &recvSize); + EXPECT_EQ(STATIC_STRLEN(kSelectRequest), recvSize); + requestBuf[recvSize] = 0; + EXPECT_STREQ(kSelectRequest, requestBuf); + + apr_socket_send(sock, kSelectAnswer, &answer_size_select); + connection_received_.Notify(); + } + + private: + WorkerTestBase::SyncPoint connection_received_; +}; + // These constants are for timeout tests. namespace { // Experiments showed that I/O functions on Linux may sometimes time out @@ -427,9 +539,9 @@ TEST_F(RedisCacheTest, ConnectionTimeout) { InitRedisWithUnreachableServer(); PosixTimer timer; int64 started_at_us = timer.NowUs(); - cache_->StartUp(); // Should try to connect as well. + cache_[0]->StartUp(); // Should try to connect as well. int64 waited_for_us = timer.NowUs() - started_at_us; - EXPECT_FALSE(cache_->IsHealthy()); + EXPECT_FALSE(cache_[0]->IsHealthy()); EXPECT_GE(waited_for_us, kTimedOutOperationMinTimeUs); EXPECT_LE(waited_for_us, kTimedOutOperationMaxTimeUs); } @@ -470,8 +582,12 @@ class GetRequestThread : public ThreadSystem::Thread { TEST_F(RedisCacheTest, IsHealthyDoesNotBlock) { InitRedisWithCustomServer(); StartCustomServer(); + cache_[0]->StartUp(); + + // enabling thread synchronizer after cache start up because + // cache startup fires redis command to select redis database + // and this execution interferes with the test thread synchronization GetThreadSynchronizer()->EnableForPrefix("RedisCommand.After"); - cache_->StartUp(); GetRequestThread thread(Cache(), thread_system_.get()); ASSERT_TRUE(thread.Start()); @@ -488,7 +604,7 @@ TEST_F(RedisCacheTest, ConnectionFastFail) { InitRedisWithCustomServer(); StartCustomServer(); GetThreadSynchronizer()->EnableForPrefix("RedisConnect.After"); - cache_->StartUp(/* connect_now */ false); + cache_[0]->StartUp(/* connect_now */ false); EXPECT_TRUE(Cache()->IsHealthy()); GetRequestThread thread(Cache(), thread_system_.get()); @@ -515,7 +631,7 @@ TEST_F(RedisCacheTest, ShutDownDuringConnection) { InitRedisWithCustomServer(); StartCustomServer(); GetThreadSynchronizer()->EnableForPrefix("RedisConnect.After"); - cache_->StartUp(/* connect_now */ false); + cache_[0]->StartUp(/* connect_now */ false); EXPECT_TRUE(Cache()->IsHealthy()); GetRequestThread thread(Cache(), thread_system_.get()); @@ -542,8 +658,8 @@ class RedisCacheOperationTimeoutTest : public RedisCacheTest { protected: void SetUp() { InitRedisWithCustomServer(); - CHECK(StartCustomServer()); - cache_->StartUp(); + CHECK(StartCustomServer()); + cache_[0]->StartUp(); started_at_us_ = timer_.NowUs(); } diff --git a/pagespeed/system/system_caches.cc b/pagespeed/system/system_caches.cc index e8543bcdda..fd01e8fe7a 100644 --- a/pagespeed/system/system_caches.cc +++ b/pagespeed/system/system_caches.cc @@ -241,11 +241,14 @@ SystemCaches::ExternalCacheInterfaces SystemCaches::NewMemcached( SystemCaches::ExternalCacheInterfaces SystemCaches::NewRedis( SystemRewriteOptions* config) { const ExternalServerSpec& server_spec = config->redis_server(); + // using database index -1 when property not specified in config + const int redis_database_index = + config->has_redis_database_index() ? config->redis_database_index() : -1; RedisCache* redis_server = new RedisCache( server_spec.host, server_spec.port, factory_->thread_system(), factory_->message_handler(), factory_->timer(), config->redis_reconnection_delay_ms(), config->redis_timeout_us(), - factory_->statistics()); + factory_->statistics(), redis_database_index); factory_->TakeOwnership(redis_server); redis_servers_.push_back(redis_server); if (redis_pool_.get() == NULL) { diff --git a/pagespeed/system/system_rewrite_options.cc b/pagespeed/system/system_rewrite_options.cc index a168a72db5..e831514bc9 100644 --- a/pagespeed/system/system_rewrite_options.cc +++ b/pagespeed/system/system_rewrite_options.cc @@ -32,6 +32,7 @@ namespace net_instaweb { namespace { const int64 kDefaultCacheFlushIntervalSec = 5; +const int64 kDefaultRedisDatabaseIndex = 0; const char kFetchHttps[] = "FetchHttps"; @@ -48,6 +49,8 @@ const char SystemRewriteOptions::kRedisServer[] = "RedisServer"; const char SystemRewriteOptions::kRedisReconnectionDelayMs[] = "RedisReconnectionDelayMs"; const char SystemRewriteOptions::kRedisTimeoutUs[] = "RedisTimeoutUs"; +const char SystemRewriteOptions::kRedisDatabaseIndex[] = + "RedisDatabaseIndex"; RewriteOptions::Properties* SystemRewriteOptions::system_properties_ = nullptr; @@ -123,6 +126,11 @@ void SystemRewriteOptions::AddProperties() { SystemRewriteOptions::kRedisTimeoutUs, "Timeout for all Redis operations and connection (us)", true); + AddSystemProperty(kDefaultRedisDatabaseIndex, + &SystemRewriteOptions::redis_database_index_, "rdi", + SystemRewriteOptions::kRedisDatabaseIndex, + "Redis server database index selection", + true); AddSystemProperty(50 * Timer::kMsUs, // 50 ms &SystemRewriteOptions::slow_file_latency_threshold_us_, "asflt", "SlowFileLatencyUs", diff --git a/pagespeed/system/system_rewrite_options.h b/pagespeed/system/system_rewrite_options.h index 4a29f3de0b..21b7493185 100644 --- a/pagespeed/system/system_rewrite_options.h +++ b/pagespeed/system/system_rewrite_options.h @@ -48,6 +48,7 @@ class SystemRewriteOptions : public RewriteOptions { static const char kRedisServer[]; static const char kRedisReconnectionDelayMs[]; static const char kRedisTimeoutUs[]; + static const char kRedisDatabaseIndex[]; static constexpr int kMemcachedDefaultPort = 11211; static constexpr int kRedisDefaultPort = 6379; @@ -187,6 +188,12 @@ class SystemRewriteOptions : public RewriteOptions { int64 redis_timeout_us() const { return redis_timeout_us_.value(); } + int redis_database_index() const { + return redis_database_index_.value(); + } + bool has_redis_database_index() const { + return redis_database_index_.was_set(); + } int64 slow_file_latency_threshold_us() const { return slow_file_latency_threshold_us_.value(); } @@ -499,6 +506,7 @@ class SystemRewriteOptions : public RewriteOptions { Option memcached_timeout_us_; Option redis_reconnection_delay_ms_; Option redis_timeout_us_; + Option redis_database_index_; Option slow_file_latency_threshold_us_; Option file_cache_clean_inode_limit_;