Skip to content

Commit

Permalink
Move shard store init to Application
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Jul 12, 2019
1 parent 40583ea commit 271d9c9
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 228 deletions.
135 changes: 20 additions & 115 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <ripple/basics/PerfLog.h>
#include <ripple/json/json_reader.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/make_Overlay.h>
#include <ripple/protocol/Feature.h>
Expand Down Expand Up @@ -448,9 +449,6 @@ class ApplicationImp
, m_nodeStore (
m_shaMapStore->makeDatabase ("NodeStore.main", 4, *m_jobQueue))

, shardStore_ (
m_shaMapStore->makeDatabaseShard ("ShardStore", 4, *m_jobQueue))

, family_ (*this, *m_nodeStore, *m_collectorManager)

, m_orderBookDB (*this, *m_jobQueue)
Expand Down Expand Up @@ -533,9 +531,22 @@ class ApplicationImp
, m_io_latency_sampler (m_collectorManager->collector()->make_event ("ios_latency"),
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
{
if (shardStore_)
sFamily_ = std::make_unique<detail::AppFamily>(
*this, *shardStore_, *m_collectorManager);
if (!config_->section(ConfigSection::shardDatabase()).empty())
{
shardStore_ = make_DatabaseShard(
*this,
"ShardStore",
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore"));
if (shardStore_->init())
sFamily_ = std::make_unique<detail::AppFamily>(
*this, *shardStore_, *m_collectorManager);
else
shardStore_.reset();
}

add (m_resourceManager.get ());

//
Expand Down Expand Up @@ -852,9 +863,6 @@ class ApplicationImp
setup.startUp == Config::LOAD_FILE ||
setup.startUp == Config::REPLAY)
{
// perform any needed table updates
updateTxnDB();

// Check if AccountTransactions has primary key
std::string cid, name, type;
std::size_t notnull, dflt_value, pk;
Expand Down Expand Up @@ -1250,7 +1258,6 @@ class ApplicationImp
// and new validations must be greater than this.
std::atomic<LedgerIndex> maxDisallowedLedger_ {0};

void updateTxnDB ();
bool nodeToShards ();
bool validateShards ();
void startGenesisLedger ();
Expand Down Expand Up @@ -1629,6 +1636,9 @@ int ApplicationImp::fdlimit() const
// doubled if online delete is enabled).
needed += std::max(5, m_shaMapStore->fdlimit());

if (shardStore_)
needed += shardStore_->fdlimit();

// One fd per incoming connection a port can accept, or
// if no limit is set, assume it'll handle 256 clients.
for(auto const& p : serverHandler_->setup().ports)
Expand Down Expand Up @@ -2065,111 +2075,6 @@ ApplicationImp::journal (std::string const& name)
return logs_->journal (name);
}

void
ApplicationImp::updateTxnDB()
{
auto schemaHas = [&](std::string const& column)
{
std::string cid, name;
soci::statement st = (mTxnDB->getSession().prepare <<
("PRAGMA table_info(AccountTransactions);"),
soci::into(cid),
soci::into(name));

st.execute();
while (st.fetch())
{
if (name == column)
return true;
}

return false;
};

assert(schemaHas("TransID"));
assert(!schemaHas("foobar"));

if (schemaHas("TxnSeq"))
return;

JLOG (m_journal.warn()) << "Transaction sequence field is missing";

auto& session = getTxnDB ().getSession ();

std::vector< std::pair<uint256, int> > txIDs;
txIDs.reserve (300000);

JLOG (m_journal.info()) << "Parsing transactions";
int i = 0;
uint256 transID;

boost::optional<std::string> strTransId;
soci::blob sociTxnMetaBlob(session);
soci::indicator tmi;
Blob txnMeta;

soci::statement st =
(session.prepare <<
"SELECT TransID, TxnMeta FROM Transactions;",
soci::into(strTransId),
soci::into(sociTxnMetaBlob, tmi));

st.execute ();
while (st.fetch ())
{
if (soci::i_ok == tmi)
convert (sociTxnMetaBlob, txnMeta);
else
txnMeta.clear ();

std::string tid = strTransId.value_or("");
transID.SetHex (tid, true);

if (txnMeta.size () == 0)
{
txIDs.push_back (std::make_pair (transID, -1));
JLOG (m_journal.info()) << "No metadata for " << transID;
}
else
{
TxMeta m (transID, 0, txnMeta);
txIDs.push_back (std::make_pair (transID, m.getIndex ()));
}

if ((++i % 1000) == 0)
{
JLOG (m_journal.info()) << i << " transactions read";
}
}

JLOG (m_journal.info()) << "All " << i << " transactions read";

soci::transaction tr(session);

JLOG (m_journal.info()) << "Dropping old index";
session << "DROP INDEX AcctTxIndex;";

JLOG (m_journal.info()) << "Altering table";
session << "ALTER TABLE AccountTransactions ADD COLUMN TxnSeq INTEGER;";

boost::format fmt ("UPDATE AccountTransactions SET TxnSeq = %d WHERE TransID = '%s';");
i = 0;
for (auto& t : txIDs)
{
session << boost::str (fmt % t.second % to_string (t.first));

if ((++i % 1000) == 0)
{
JLOG (m_journal.info()) << i << " transactions updated";
}
}

JLOG (m_journal.info()) << "Building new index";
session << "CREATE INDEX AcctTxIndex ON AccountTransactions(Account, LedgerSeq, TxnSeq, TransID);";

tr.commit ();
}

bool ApplicationImp::nodeToShards()
{
assert(m_overlay);
Expand Down
7 changes: 1 addition & 6 deletions src/ripple/app/misc/SHAMapStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TransactionMaster;

/**
* class to create database, launch online delete thread, and
* related sqlite databse
* related SQLite database
*/
class SHAMapStore
: public Stoppable
Expand All @@ -50,7 +50,6 @@ class SHAMapStore
std::uint32_t deleteBatch = 100;
std::uint32_t backOff = 100;
std::int32_t ageThreshold = 60;
Section shardDatabase;
};

SHAMapStore (Stoppable& parent) : Stoppable ("SHAMapStore", parent) {}
Expand All @@ -66,10 +65,6 @@ class SHAMapStore
std::string const& name,
std::int32_t readThreads, Stoppable& parent) = 0;

virtual std::unique_ptr <NodeStore::DatabaseShard> makeDatabaseShard(
std::string const& name, std::int32_t readThreads,
Stoppable& parent) = 0;

/** Highest ledger that may be deleted. */
virtual LedgerIndex setCanDelete (LedgerIndex canDelete) = 0;

Expand Down
66 changes: 0 additions & 66 deletions src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <ripple/beast/core/CurrentThreadName.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/impl/DatabaseRotatingImp.h>
#include <ripple/nodestore/impl/DatabaseShardImp.h>

namespace ripple {
void SHAMapStoreImp::SavedStateDB::init (BasicConfig const& config,
Expand Down Expand Up @@ -200,52 +199,6 @@ SHAMapStoreImp::SHAMapStoreImp (

dbPaths();
}
if (! setup_.shardDatabase.empty())
{
// The node and shard stores must use
// the same earliest ledger sequence
std::array<std::uint32_t, 2> seq;
if (get_if_exists<std::uint32_t>(
setup_.nodeDatabase, "earliest_seq", seq[0]))
{
if (get_if_exists<std::uint32_t>(
setup_.shardDatabase, "earliest_seq", seq[1]) &&
seq[0] != seq[1])
{
Throw<std::runtime_error>("earliest_seq set more than once");
}
}

boost::filesystem::path dbPath =
get<std::string>(setup_.shardDatabase, "path");
if (dbPath.empty())
Throw<std::runtime_error>("shard path missing");
if (boost::filesystem::exists(dbPath))
{
if (! boost::filesystem::is_directory(dbPath))
Throw<std::runtime_error>("shard db path must be a directory.");
}
else
boost::filesystem::create_directories(dbPath);

auto const maxDiskSpace = get<std::uint64_t>(
setup_.shardDatabase, "max_size_gb", 0);
// Must be large enough for one shard
if (maxDiskSpace < 3)
Throw<std::runtime_error>("max_size_gb too small");
if ((maxDiskSpace << 30) < maxDiskSpace)
Throw<std::runtime_error>("overflow max_size_gb");

std::uint32_t lps;
if (get_if_exists<std::uint32_t>(
setup_.shardDatabase, "ledgers_per_shard", lps))
{
// ledgers_per_shard to be set only in standalone for testing
if (! setup_.standalone)
Throw<std::runtime_error>(
"ledgers_per_shard only honored in stand alone");
}
}
}

std::unique_ptr <NodeStore::Database>
Expand Down Expand Up @@ -283,24 +236,6 @@ SHAMapStoreImp::makeDatabase (std::string const& name,
return db;
}

std::unique_ptr<NodeStore::DatabaseShard>
SHAMapStoreImp::makeDatabaseShard(std::string const& name,
std::int32_t readThreads, Stoppable& parent)
{
std::unique_ptr<NodeStore::DatabaseShard> db;
if(! setup_.shardDatabase.empty())
{
db = std::make_unique<NodeStore::DatabaseShardImp>(
app_, name, parent, scheduler_, readThreads,
setup_.shardDatabase, app_.journal("ShardStore"));
if (db->init())
fdlimit_ += db->fdlimit();
else
db.reset();
}
return db;
}

void
SHAMapStoreImp::onLedgerClosed(
std::shared_ptr<Ledger const> const& ledger)
Expand Down Expand Up @@ -763,7 +698,6 @@ setup_SHAMapStore (Config const& c)
get_if_exists (setup.nodeDatabase, "backOff", setup.backOff);
get_if_exists (setup.nodeDatabase, "age_threshold", setup.ageThreshold);

setup.shardDatabase = c.section(ConfigSection::shardDatabase());
return setup;
}

Expand Down
4 changes: 0 additions & 4 deletions src/ripple/app/misc/SHAMapStoreImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ class SHAMapStoreImp : public SHAMapStore
std::string const&name,
std::int32_t readThreads, Stoppable& parent) override;

std::unique_ptr <NodeStore::DatabaseShard>
makeDatabaseShard(std::string const& name,
std::int32_t readThreads, Stoppable& parent) override;

LedgerIndex
setCanDelete (LedgerIndex seq) override
{
Expand Down
16 changes: 13 additions & 3 deletions src/ripple/nodestore/DatabaseShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DatabaseShard : public Database
@param parent The parent Stoppable
@param scheduler The scheduler to use for performing asynchronous tasks
@param readThreads The number of async read threads to create
@param config The configuration for the database
@param config The shard configuration section for the database
@param journal Destination for logging output
*/
DatabaseShard(
Expand Down Expand Up @@ -89,9 +89,9 @@ class DatabaseShard : public Database
bool
prepareShard(std::uint32_t shardIndex) = 0;

/** Remove shard indexes from prepared import
/** Remove a previously prepared shard index for import
@param indexes Shard indexes to be removed from import
@param shardIndex Shard index to be removed from import
*/
virtual
void
Expand Down Expand Up @@ -219,6 +219,16 @@ seqToShardIndex(std::uint32_t seq,
return (seq - 1) / ledgersPerShard;
}

extern
std::unique_ptr<DatabaseShard>
make_DatabaseShard(
Application& app,
std::string const& name,
Stoppable& parent,
Scheduler& scheduler,
int readThreads,
beast::Journal j);

}
}

Expand Down

0 comments on commit 271d9c9

Please sign in to comment.