Skip to content

Commit

Permalink
Move shard store init to Application
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Jul 26, 2019
1 parent 401f026 commit 18002e7
Show file tree
Hide file tree
Showing 15 changed files with 340 additions and 409 deletions.
151 changes: 23 additions & 128 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <ripple/basics/PerfLog.h>
#include <ripple/json/json_reader.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/make_Overlay.h>
#include <ripple/protocol/BuildInfo.h>
Expand Down Expand Up @@ -419,9 +420,11 @@ class ApplicationImp

, m_nodeStoreScheduler (*this)

, m_shaMapStore (make_SHAMapStore (*this, setup_SHAMapStore (*config_),
*this, m_nodeStoreScheduler, logs_->journal("SHAMapStore"),
logs_->journal("NodeObject"), m_txMaster, *config_))
, m_shaMapStore(make_SHAMapStore(
*this,
*this,
m_nodeStoreScheduler,
logs_->journal("SHAMapStore")))

, accountIDCache_(128000)

Expand All @@ -443,14 +446,15 @@ class ApplicationImp
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_, *perfLog_))

//
// Anything which calls addJob must be a descendant of the JobQueue
//
, m_nodeStore (
m_shaMapStore->makeDatabase ("NodeStore.main", 4, *m_jobQueue))
, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))

, shardStore_ (
m_shaMapStore->makeDatabaseShard ("ShardStore", 4, *m_jobQueue))
// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
*this,
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore")))

, family_ (*this, *m_nodeStore, *m_collectorManager)

Expand Down Expand Up @@ -535,8 +539,13 @@ class ApplicationImp
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
{
if (shardStore_)
{
sFamily_ = std::make_unique<detail::AppFamily>(
*this, *shardStore_, *m_collectorManager);
*this,
*shardStore_,
*m_collectorManager);
}

add (m_resourceManager.get ());

//
Expand Down Expand Up @@ -853,9 +862,6 @@ class ApplicationImp
setup.startUp == Config::LOAD_FILE ||
setup.startUp == Config::REPLAY)
{
// perform any needed table updates
updateTxnDB();

// Check if AccountTransactions has primary key
std::string cid, name, type;
std::size_t notnull, dflt_value, pk;
Expand Down Expand Up @@ -913,14 +919,6 @@ class ApplicationImp
bool
initNodeStoreDBs()
{
if (config_->section(ConfigSection::nodeDatabase()).empty())
{
JLOG(m_journal.fatal()) <<
"The [node_db] configuration setting " <<
"has been updated and must be set";
return false;
}

if (config_->doImport)
{
auto j = logs_->journal("NodeObject");
Expand Down Expand Up @@ -1251,7 +1249,6 @@ class ApplicationImp
// and new validations must be greater than this.
std::atomic<LedgerIndex> maxDisallowedLedger_ {0};

void updateTxnDB ();
bool nodeToShards ();
bool validateShards ();
void startGenesisLedger ();
Expand Down Expand Up @@ -1632,6 +1629,9 @@ int ApplicationImp::fdlimit() const
// doubled if online delete is enabled).
needed += std::max(5, m_shaMapStore->fdlimit());

if (shardStore_)
needed += shardStore_->fdlimit();

// One fd per incoming connection a port can accept, or
// if no limit is set, assume it'll handle 256 clients.
for(auto const& p : serverHandler_->setup().ports)
Expand Down Expand Up @@ -2068,111 +2068,6 @@ ApplicationImp::journal (std::string const& name)
return logs_->journal (name);
}

void
ApplicationImp::updateTxnDB()
{
auto schemaHas = [&](std::string const& column)
{
std::string cid, name;
soci::statement st = (mTxnDB->getSession().prepare <<
("PRAGMA table_info(AccountTransactions);"),
soci::into(cid),
soci::into(name));

st.execute();
while (st.fetch())
{
if (name == column)
return true;
}

return false;
};

assert(schemaHas("TransID"));
assert(!schemaHas("foobar"));

if (schemaHas("TxnSeq"))
return;

JLOG (m_journal.warn()) << "Transaction sequence field is missing";

auto& session = getTxnDB ().getSession ();

std::vector< std::pair<uint256, int> > txIDs;
txIDs.reserve (300000);

JLOG (m_journal.info()) << "Parsing transactions";
int i = 0;
uint256 transID;

boost::optional<std::string> strTransId;
soci::blob sociTxnMetaBlob(session);
soci::indicator tmi;
Blob txnMeta;

soci::statement st =
(session.prepare <<
"SELECT TransID, TxnMeta FROM Transactions;",
soci::into(strTransId),
soci::into(sociTxnMetaBlob, tmi));

st.execute ();
while (st.fetch ())
{
if (soci::i_ok == tmi)
convert (sociTxnMetaBlob, txnMeta);
else
txnMeta.clear ();

std::string tid = strTransId.value_or("");
transID.SetHex (tid, true);

if (txnMeta.size () == 0)
{
txIDs.push_back (std::make_pair (transID, -1));
JLOG (m_journal.info()) << "No metadata for " << transID;
}
else
{
TxMeta m (transID, 0, txnMeta);
txIDs.push_back (std::make_pair (transID, m.getIndex ()));
}

if ((++i % 1000) == 0)
{
JLOG (m_journal.info()) << i << " transactions read";
}
}

JLOG (m_journal.info()) << "All " << i << " transactions read";

soci::transaction tr(session);

JLOG (m_journal.info()) << "Dropping old index";
session << "DROP INDEX AcctTxIndex;";

JLOG (m_journal.info()) << "Altering table";
session << "ALTER TABLE AccountTransactions ADD COLUMN TxnSeq INTEGER;";

boost::format fmt ("UPDATE AccountTransactions SET TxnSeq = %d WHERE TransID = '%s';");
i = 0;
for (auto& t : txIDs)
{
session << boost::str (fmt % t.second % to_string (t.first));

if ((++i % 1000) == 0)
{
JLOG (m_journal.info()) << i << " transactions updated";
}
}

JLOG (m_journal.info()) << "Building new index";
session << "CREATE INDEX AcctTxIndex ON AccountTransactions(Account, LedgerSeq, TxnSeq, TransID);";

tr.commit ();
}

bool ApplicationImp::nodeToShards()
{
assert(m_overlay);
Expand Down
37 changes: 5 additions & 32 deletions src/ripple/app/misc/SHAMapStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,12 @@ class TransactionMaster;

/**
* class to create database, launch online delete thread, and
* related sqlite databse
* related SQLite database
*/
class SHAMapStore
: public Stoppable
{
public:
struct Setup
{
explicit Setup() = default;

bool standalone = false;
std::uint32_t deleteInterval = 0;
bool advisoryDelete = false;
std::uint32_t ledgerHistory = 0;
Section nodeDatabase;
std::string databasePath;
std::uint32_t deleteBatch = 100;
std::uint32_t backOff = 100;
std::int32_t ageThreshold = 60;
Section shardDatabase;
};

SHAMapStore (Stoppable& parent) : Stoppable ("SHAMapStore", parent) {}

/** Called by LedgerMaster every time a ledger validates. */
Expand All @@ -62,13 +46,9 @@ class SHAMapStore

virtual std::uint32_t clampFetchDepth (std::uint32_t fetch_depth) const = 0;

virtual std::unique_ptr <NodeStore::Database> makeDatabase (
std::string const& name,
std::int32_t readThreads, Stoppable& parent) = 0;

virtual std::unique_ptr <NodeStore::DatabaseShard> makeDatabaseShard(
std::string const& name, std::int32_t readThreads,
Stoppable& parent) = 0;
virtual
std::unique_ptr <NodeStore::Database>
makeNodeStore(std::string const& name, std::int32_t readThreads) = 0;

/** Highest ledger that may be deleted. */
virtual LedgerIndex setCanDelete (LedgerIndex canDelete) = 0;
Expand All @@ -88,19 +68,12 @@ class SHAMapStore

//------------------------------------------------------------------------------

SHAMapStore::Setup
setup_SHAMapStore(Config const& c);

std::unique_ptr<SHAMapStore>
make_SHAMapStore(
Application& app,
SHAMapStore::Setup const& s,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal,
beast::Journal nodeStoreJournal,
TransactionMaster& transactionMaster,
BasicConfig const& conf);
beast::Journal journal);
}

#endif

0 comments on commit 18002e7

Please sign in to comment.