Skip to content

Commit

Permalink
Maintain zookeeper lock during the whole database initialization
Browse files Browse the repository at this point in the history
To ensure cassandra schema consistency, zookeeper lock is
used so that only one collector node is creating schema at
any given point of time. However if the creation of schema
fails then we were releasing the zookeeper lock and retrying.
This resulted in a situation where from cassandra perspective
schema was created concurrently and caused column family ID
mismatch. So now we will only release the zookeeper lock when
the schema creation is successful.

Change-Id: I1af4ef147ec31d44bce258b5af319589e27eb64e
Closes-Bug: #1719830
  • Loading branch information
Megh Bhatt committed Sep 29, 2017
1 parent 54833d2 commit 2302f77
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 55 deletions.
59 changes: 23 additions & 36 deletions src/analytics/db_handler.cc
Expand Up @@ -61,8 +61,6 @@ DbHandler::DbHandler(EventManager *evm,
GenDb::GenDbIf::DbErrorHandler err_handler,
std::string name,
const Options::Cassandra &cassandra_options,
const std::string &zookeeper_server_list,
bool use_zookeeper,
bool use_db_write_options,
const DbWriteOptions &db_write_options,
const std::vector<std::string> &api_server_list,
Expand All @@ -78,8 +76,6 @@ DbHandler::DbHandler(EventManager *evm,
cassandra_options.flow_tables_compaction_strategy_),
gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN,
(uint8_t)g_viz_constants.PARTITION_MAX),
zookeeper_server_list_(zookeeper_server_list),
use_zookeeper_(use_zookeeper),
disable_all_writes_(cassandra_options.disable_all_db_writes_),
disable_statistics_writes_(cassandra_options.disable_db_stats_writes_),
disable_messages_writes_(cassandra_options.disable_db_messages_writes_),
Expand Down Expand Up @@ -507,13 +503,6 @@ void DbHandler::UnInit() {
dbif_->Db_SetInitDone(false);
}

// The caller *SHOULD* ensure that UnInit() is not called from another
// task that can be executed in parallel.
void DbHandler::UnInitUnlocked() {
dbif_->Db_UninitUnlocked();
dbif_->Db_SetInitDone(false);
}

bool DbHandler::Init(bool initial) {
SetDropLevel(0, SandeshLevel::INVALID, NULL);
if (initial) {
Expand All @@ -523,7 +512,7 @@ bool DbHandler::Init(bool initial) {
}
}

bool DbHandler::InitializeInternal() {
bool DbHandler::Initialize() {
DB_LOG(DEBUG, "Initializing..");

/* init of vizd table structures */
Expand All @@ -550,25 +539,6 @@ bool DbHandler::InitializeInternal() {
return true;
}

bool DbHandler::InitializeInternalLocked() {
// Synchronize creation across nodes using zookeeper
zookeeper::client::ZookeeperClient client(name_.c_str(),
zookeeper_server_list_.c_str());
zookeeper::client::ZookeeperLock dmutex(&client, "/collector");
assert(dmutex.Lock());
bool success(InitializeInternal());
assert(dmutex.Release());
return success;
}

bool DbHandler::Initialize() {
if (use_zookeeper_) {
return InitializeInternalLocked();
} else {
return InitializeInternal();
}
}

bool DbHandler::Setup() {
DB_LOG(DEBUG, "Setup..");
if (!dbif_->Db_Init()) {
Expand Down Expand Up @@ -2324,6 +2294,8 @@ bool DbHandler::UnderlayFlowSampleInsert(const UFlowData& flow_data,
return true;
}

using namespace zookeeper::client;

DbHandlerInitializer::DbHandlerInitializer(EventManager *evm,
const std::string &db_name, const std::string &timer_task_name,
DbHandlerInitializer::InitializeDoneCb callback,
Expand All @@ -2336,13 +2308,20 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm,
db_name_(db_name),
db_handler_(new DbHandler(evm,
boost::bind(&DbHandlerInitializer::ScheduleInit, this),
db_name,
cassandra_options, zookeeper_server_list, use_zookeeper,
db_name, cassandra_options,
true, db_write_options, api_server_list, api_config)),
callback_(callback),
db_init_timer_(TimerManager::CreateTimer(*evm->io_service(),
db_name + " Db Init Timer",
TaskScheduler::GetInstance()->GetTaskId(timer_task_name))) {
TaskScheduler::GetInstance()->GetTaskId(timer_task_name))),
zookeeper_server_list_(zookeeper_server_list),
use_zookeeper_(use_zookeeper),
zoo_locked_(false) {
if (use_zookeeper_) {
zoo_client_.reset(new ZookeeperClient(db_name_.c_str(),
zookeeper_server_list_.c_str()));
zoo_mutex_.reset(new ZookeeperLock(zoo_client_.get(), "/collector"));
}
}

DbHandlerInitializer::DbHandlerInitializer(EventManager *evm,
Expand All @@ -2361,7 +2340,11 @@ DbHandlerInitializer::~DbHandlerInitializer() {
}

bool DbHandlerInitializer::Initialize() {
boost::system::error_code ec;
// Synchronize creation across nodes using zookeeper
if (use_zookeeper_ && !zoo_locked_) {
assert(zoo_mutex_->Lock());
zoo_locked_ = true;
}
if (!db_handler_->Init(true)) {
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
Expand All @@ -2371,6 +2354,10 @@ bool DbHandlerInitializer::Initialize() {
ScheduleInit();
return false;
}
if (use_zookeeper_ && zoo_locked_) {
assert(zoo_mutex_->Release());
zoo_locked_ = false;
}
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
db_name_, ConnectionStatus::UP, db_handler_->GetEndpoints(),
Expand Down Expand Up @@ -2413,6 +2400,6 @@ void DbHandlerInitializer::StartInitTimer() {
}

void DbHandlerInitializer::ScheduleInit() {
db_handler_->UnInitUnlocked();
db_handler_->UnInit();
StartInitTimer();
}
19 changes: 12 additions & 7 deletions src/analytics/db_handler.h
Expand Up @@ -90,8 +90,6 @@ class DbHandler {
DbHandler(EventManager *evm, GenDb::GenDbIf::DbErrorHandler err_handler,
std::string name,
const Options::Cassandra &cassandra_options,
const std::string &zookeeper_server_list,
bool use_zookeeper,
bool use_db_write_options,
const DbWriteOptions &db_write_options,
const std::vector<std::string> &api_server_list,
Expand All @@ -106,7 +104,6 @@ class DbHandler {
bool DropMessage(const SandeshHeader &header, const VizMsg *vmsg);
bool Init(bool initial);
void UnInit();
void UnInitUnlocked();
void GetRuleMap(RuleMap& rulemap);

virtual void MessageTableInsert(const VizMsg *vmsgp,
Expand Down Expand Up @@ -215,8 +212,6 @@ class DbHandler {
boost::function<void (void)> cb);
bool Setup();
bool Initialize();
bool InitializeInternal();
bool InitializeInternalLocked();
bool StatTableWrite(uint32_t t2,
const std::string& statName, const std::string& statAttr,
const std::pair<std::string,DbHandler::Var>& ptag,
Expand Down Expand Up @@ -257,8 +252,6 @@ class DbHandler {
std::string compaction_strategy_;
std::string flow_tables_compaction_strategy_;
UniformInt8RandomGenerator gen_partition_no_;
std::string zookeeper_server_list_;
bool use_zookeeper_;
bool disable_all_writes_;
bool disable_statistics_writes_;
bool disable_messages_writes_;
Expand Down Expand Up @@ -303,6 +296,13 @@ inline std::ostream& operator<<(std::ostream& out, const DbHandler::Var& value)
return out;
}

namespace zookeeper {
namespace client {
class ZookeeperClient;
class ZookeeperLock;
} // namespace client
} // namespace zookeeper

//
// DbHandlerInitializer - Wrapper to perform DbHandler initialization
//
Expand Down Expand Up @@ -339,6 +339,11 @@ class DbHandlerInitializer {
DbHandlerPtr db_handler_;
InitializeDoneCb callback_;
Timer *db_init_timer_;
std::string zookeeper_server_list_;
bool use_zookeeper_;
bool zoo_locked_;
boost::scoped_ptr<zookeeper::client::ZookeeperClient> zoo_client_;
boost::scoped_ptr<zookeeper::client::ZookeeperLock> zoo_mutex_;
};

/*
Expand Down
2 changes: 0 additions & 2 deletions src/analytics/protobuf_collector.h
Expand Up @@ -11,8 +11,6 @@

#include "analytics/protobuf_server.h"

class DbHandlerInitializer;

class ProtobufCollector {
public:
ProtobufCollector(EventManager *evm, uint16_t udp_server_port,
Expand Down
4 changes: 2 additions & 2 deletions src/analytics/test/db_handler_mock.h
Expand Up @@ -13,8 +13,8 @@ class DbHandlerMock : public DbHandler {
DbHandlerMock(EventManager *evm, const Options::Cassandra cassandra_options) :
DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this),
"localhost",
cassandra_options, "",
false, false,
cassandra_options,
false,
DbWriteOptions(),
std::vector<std::string>(),
VncApiConfig()) {
Expand Down
2 changes: 0 additions & 2 deletions src/analytics/test/db_handler_test.cc
Expand Up @@ -195,8 +195,6 @@ class DbHandlerMsgKeywordInsertTest : public ::testing::Test {
db_handler_.reset(new DbHandler(evm_.get(), boost::bind(&DbHandler::UnInit, db_handler_.get()),
"localhost",
cassandra_options_,
"",
false,
false,
DbWriteOptions(),
std::vector<std::string>(),
Expand Down
4 changes: 0 additions & 4 deletions src/database/cassandra/cql/cql_if.cc
Expand Up @@ -2451,10 +2451,6 @@ bool CqlIf::Db_Init() {
}

void CqlIf::Db_Uninit() {
Db_UninitUnlocked();
}

void CqlIf::Db_UninitUnlocked() {
impl_->DisconnectSync();
}

Expand Down
1 change: 0 additions & 1 deletion src/database/cassandra/cql/cql_if.h
Expand Up @@ -32,7 +32,6 @@ class CqlIf : public GenDb::GenDbIf {
// Init/Uninit
virtual bool Db_Init();
virtual void Db_Uninit();
virtual void Db_UninitUnlocked();
virtual void Db_SetInitDone(bool);
// Tablespace
virtual bool Db_SetTablespace(const std::string &tablespace);
Expand Down
1 change: 0 additions & 1 deletion src/database/gendb_if.h
Expand Up @@ -259,7 +259,6 @@ class GenDbIf {
// Init/Uninit
virtual bool Db_Init() = 0;
virtual void Db_Uninit() = 0;
virtual void Db_UninitUnlocked() = 0;
virtual void Db_SetInitDone(bool init_done) = 0;
// Tablespace
virtual bool Db_SetTablespace(const std::string& tablespace) = 0;
Expand Down

1 comment on commit 2302f77

@Groundsea
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this bug has some workaround? I met this issue, and I can't apply patch now.
I restart the contrail service, but it doesn't take effect.

Please sign in to comment.