Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Commit

Permalink
Add support for redis database selection (#1627)
Browse files Browse the repository at this point in the history
Add support for redis database selection

Fixes #1609
  • Loading branch information
ashishk-1 authored and oschaaf committed Sep 18, 2017
1 parent 8b7e239 commit db6e964
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 53 deletions.
12 changes: 12 additions & 0 deletions html/doc/system.html
Expand Up @@ -475,6 +475,18 @@ <h4 id="redis">Configuring Redis</h4>
<dt>Nginx:<dd><pre class="prettyprint"
>pagespeed RedisReconnectionDelayMs timeout_in_milliseconds;</pre>
</dl>
<p>
You can also select the Redis logical database having the specified numeric index.
This setting defaults to database index 0.
Note that the Redis clustering feature does not allow specifying a database index,
and therefore is incompatible with this setting:
</p>
<dl>
<dt>Apache:<dd><pre class="prettyprint"
>ModPagespeedRedisDatabaseIndex index</pre>
<dt>Nginx:<dd><pre class="prettyprint"
>pagespeed RedisDatabaseIndex index;</pre>
</dl>

<h2 id="flush_cache">Flushing PageSpeed Server-Side Cache</h2>
<p>
Expand Down
1 change: 1 addition & 0 deletions install/debug.conf.template
Expand Up @@ -2251,6 +2251,7 @@ AddType application/javascript .js
#ALL_DIRECTIVES ModPagespeedRedisServer localhost:55555
#ALL_DIRECTIVES ModPagespeedRedisReconnectionDelayMs 1000
#ALL_DIRECTIVES ModPagespeedRedisTimeoutUs 50000
#ALL_DIRECTIVES ModPagespeedRedisDatabaseIndex 0
#ALL_DIRECTIVES ModPagespeedReportUnloadTime true
#ALL_DIRECTIVES ModPagespeedRespectVary true
#ALL_DIRECTIVES ModPagespeedRespectXForwardedProto off
Expand Down
58 changes: 47 additions & 11 deletions pagespeed/system/redis_cache.cc
Expand Up @@ -77,14 +77,16 @@ namespace net_instaweb {
//
// TODO(yeputons): consider removing limit on amount of redirections.
static const int kMaxRedirections = 1;
static const int kDefaultDatabaseIndex = 0;
static const int kRedisDatabaseIndexNotSet = -1;

const char kRedisClusterRedirections[] = "redis_cluster_redirections";
const char kRedisClusterSlotsFetches[] = "redis_cluster_slots_fetches";

RedisCache::RedisCache(StringPiece host, int port, ThreadSystem* thread_system,
MessageHandler* message_handler, Timer* timer,
int64 reconnection_delay_ms, int64 timeout_us,
Statistics* stats)
Statistics* stats, int database_index)
: main_host_(host.as_string()),
main_port_(port),
thread_system_(thread_system),
Expand All @@ -95,7 +97,8 @@ RedisCache::RedisCache(StringPiece host, int port, ThreadSystem* thread_system,
thread_synchronizer_(new ThreadSynchronizer(thread_system)),
connections_lock_(thread_system_->NewRWLock()),
cluster_map_lock_(thread_system_->NewRWLock()),
main_connection_(nullptr) {
main_connection_(nullptr),
database_index_(database_index) {
redirections_ = stats->GetVariable(kRedisClusterRedirections);
cluster_slots_fetches_ = stats->GetVariable(kRedisClusterSlotsFetches);
}
Expand All @@ -118,7 +121,7 @@ void RedisCache::StartUp(bool connect_now) {
CHECK(connections_.empty());
CHECK(!main_connection_);
std::unique_ptr<Connection> conn(
new Connection(this, main_host_, main_port_));
new Connection(this, main_host_, main_port_, database_index_));
main_connection_ = conn.get();
connections_.emplace(StrCat(main_host_, ":", IntegerToString(main_port_)),
std::move(conn));
Expand Down Expand Up @@ -251,7 +254,7 @@ RedisCache::RedisReply RedisCache::RedisCommand(
for (redirections = 0; redirections <= kMaxRedirections;
redirections++,
last_redirecting_connection = conn,
conn = GetOrCreateConnection(redirected_to),
conn = GetOrCreateConnection(redirected_to, kDefaultDatabaseIndex),
redirections_->Add(1)) {
ScopedMutex lock(conn->GetOperationMutex());

Expand Down Expand Up @@ -346,7 +349,7 @@ ExternalServerSpec RedisCache::ParseRedirectionError(StringPiece error) {
}

RedisCache::Connection* RedisCache::GetOrCreateConnection(
ExternalServerSpec spec) {
ExternalServerSpec spec, const int database_index) {
Connection* result;
bool should_start_up = false;
{
Expand All @@ -356,7 +359,8 @@ RedisCache::Connection* RedisCache::GetOrCreateConnection(
if (it == connections_.end()) {
LOG(INFO) << "Initiating connection Redis server at " << spec.ToString();
it = connections_.emplace(name, std::unique_ptr<Connection>(
new Connection(this, spec.host, spec.port))).first;
new Connection(this, spec.host, spec.port, database_index)))
.first;
should_start_up = true;
}
result = it->second.get();
Expand Down Expand Up @@ -453,10 +457,11 @@ void RedisCache::FetchClusterSlotMapping(Connection* connection) {
return;
}
// Everything is there and is the right type. Store it.
// Using database 0 for cluster
new_cluster_mappings.push_back(ClusterMapping(
start_slot_range->integer, end_slot_range->integer,
GetOrCreateConnection(ExternalServerSpec(
master_ip->str, master_port->integer))));
master_ip->str, master_port->integer), kDefaultDatabaseIndex)));
}

// Sort new_cluster_mappings based on start_slot_range_.
Expand Down Expand Up @@ -513,15 +518,16 @@ RedisCache::Connection* RedisCache::LookupConnection(StringPiece key) {
}

RedisCache::Connection::Connection(RedisCache* redis_cache, StringPiece host,
int port)
int port, int database_index)
: redis_cache_(redis_cache),
host_(host.as_string()),
port_(port),
redis_mutex_(redis_cache_->thread_system_->NewMutex()),
state_mutex_(redis_cache_->thread_system_->NewMutex()),
redis_(nullptr),
state_(kShutDown),
next_reconnect_at_ms_(redis_cache_->timer_->NowMs()) {}
next_reconnect_at_ms_(redis_cache_->timer_->NowMs()),
database_index_(database_index) {}

void RedisCache::Connection::StartUp(bool connect_now) {
CHECK_NE("", host_);
Expand All @@ -533,7 +539,7 @@ void RedisCache::Connection::StartUp(bool connect_now) {
state_ = kDisconnected;
}
if (connect_now) {
EnsureConnection();
EnsureConnectionAndDatabaseSelection();
}
}

Expand All @@ -559,6 +565,21 @@ void RedisCache::Connection::ShutDown() {
state_ = kShutDown;
}

bool RedisCache::Connection::EnsureConnectionAndDatabaseSelection() {
{
ScopedMutex lock(state_mutex_.get());
if (state_ == kConnected) {
return true;
}
}

if (!EnsureConnection()) {
return false;
}

return EnsureDatabaseSelection();
}

bool RedisCache::Connection::EnsureConnection() {
{
ScopedMutex lock(state_mutex_.get());
Expand Down Expand Up @@ -599,6 +620,21 @@ bool RedisCache::Connection::EnsureConnection() {
return state_ == kConnected;
}

bool RedisCache::Connection::EnsureDatabaseSelection() {
// dont select database if database index property not specified in config
if (database_index_ != kRedisDatabaseIndexNotSet) {
RedisReply reply = RedisCommand(StrCat("SELECT ",
IntegerToString(database_index_)).c_str(), REDIS_REPLY_STRING);
if (reply == nullptr) {
ScopedMutex lock(state_mutex_.get());
state_ = kDisconnected;
redis_.reset();
return false;
}
}
return true;
}

RedisCache::RedisContext RedisCache::Connection::TryConnect() {
struct timeval timeout;
timeout.tv_sec = redis_cache_->timeout_us_ / Timer::kSecondUs;
Expand Down Expand Up @@ -655,7 +691,7 @@ void RedisCache::Connection::UpdateState() {

RedisCache::RedisReply RedisCache::Connection::RedisCommand(const char* format,
va_list args) {
if (!EnsureConnection()) {
if (!EnsureConnectionAndDatabaseSelection()) {
return nullptr;
}

Expand Down
20 changes: 17 additions & 3 deletions pagespeed/system/redis_cache.h
Expand Up @@ -80,7 +80,7 @@ class RedisCache : public CacheInterface {
RedisCache(StringPiece host, int port, ThreadSystem* thread_system,
MessageHandler* message_handler, Timer* timer,
int64 reconnection_delay_ms, int64 timeout_us,
Statistics* stats);
Statistics* stats, int database_index);
~RedisCache() override { ShutDown(); }

static void InitStats(Statistics* stats);
Expand Down Expand Up @@ -142,7 +142,8 @@ class RedisCache : public CacheInterface {

class Connection {
public:
Connection(RedisCache* redis_cache, StringPiece host, int port);
Connection(RedisCache* redis_cache, StringPiece host, int port,
int database_index);

void StartUp(bool connect_now = true)
LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
Expand Down Expand Up @@ -179,8 +180,14 @@ class RedisCache : public CacheInterface {
bool IsHealthyLockHeld() const EXCLUSIVE_LOCKS_REQUIRED(state_mutex_);
void UpdateState() EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_, state_mutex_);

// connects with redis as well as selects redis database
bool EnsureConnectionAndDatabaseSelection()
EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_)
LOCKS_EXCLUDED(state_mutex_);
bool EnsureConnection() EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_)
LOCKS_EXCLUDED(state_mutex_);
bool EnsureDatabaseSelection() EXCLUSIVE_LOCKS_REQUIRED(state_mutex_)
LOCKS_EXCLUDED(state_mutex_);

RedisContext TryConnect() LOCKS_EXCLUDED(redis_mutex_, state_mutex_);

Expand All @@ -196,6 +203,10 @@ class RedisCache : public CacheInterface {
State state_ GUARDED_BY(state_mutex_);
int64 next_reconnect_at_ms_ GUARDED_BY(state_mutex_);

// selected database is a property of the connection,
// should re-select it on reconnection
const int database_index_;

DISALLOW_COPY_AND_ASSIGN(Connection);
};
typedef std::map<GoogleString, std::unique_ptr<Connection>> ConnectionsMap;
Expand Down Expand Up @@ -233,7 +244,8 @@ class RedisCache : public CacheInterface {

// Must not be called under Connection::GetOperationLock(), that will cause
// lock inversion and potential theoretical deadlock.
Connection* GetOrCreateConnection(ExternalServerSpec spec);
Connection* GetOrCreateConnection(ExternalServerSpec spec,
const int database_index);

// Ask redis what keys should go to which servers.
void FetchClusterSlotMapping(Connection* connection)
Expand Down Expand Up @@ -267,6 +279,8 @@ class RedisCache : public CacheInterface {
// Not guarded, but should only be modified in StartUp().
Connection* main_connection_;

const int database_index_;

friend class RedisCacheTest;
DISALLOW_COPY_AND_ASSIGN(RedisCache);
};
Expand Down
3 changes: 2 additions & 1 deletion pagespeed/system/redis_cache_cluster_test.cc
Expand Up @@ -47,6 +47,7 @@ namespace {
static const int kReconnectionDelayMs = 10;
static const int kTimeoutUs = 100 * Timer::kMsUs;
static const int kSlaveNodesFlushingTimeoutMs = 1000;
static const int kDatabaseIndex = 0;

// One can check following constants with CLUSTER KEYSLOT command.
// For testing purposes, both KEY and {}KEY should be in the same slot range.
Expand Down Expand Up @@ -101,7 +102,7 @@ class RedisCacheClusterTest : public CacheTestBase {
// Setting up cache.
cache_.reset(new RedisCache("localhost", ports_[0], thread_system_.get(),
&handler_, &timer_, kReconnectionDelayMs,
kTimeoutUs, &statistics_));
kTimeoutUs, &statistics_, kDatabaseIndex));
cache_->StartUp();
return true;
}
Expand Down

0 comments on commit db6e964

Please sign in to comment.