Skip to content
刘禄恒 edited this page Nov 14, 2017 · 47 revisions

TableDB

TableDB Raft

On the basis of NPL Raft implementation, TableDB Raft implementation will be much easier. But the implementation can differ much, with the compare between actordb and rqlite.

  • rqlite is easy, it simply use SQL statement as Raft Log Entry.
  • actordb goes more complicate:

    Actors are replicated using the Raft distributed consensus protocol. Raft requires a write log to operate. Because our two engines are connected through the SQLite WAL module, Raft replication is a natural fit. Every write to the database is an append to WAL. For every append we send that data to the entire cluster to be replicated. Pages are simply inserted to WAL on all nodes. This means the leader executes the SQL, but the followers just append to WAL.

Log Entry

utilize the msg in IORequest:Send, the log entry looks like below:

function RaftLogEntryValue:new(query_type, collection, query) 
    local o = {
      query_type = query_type, 
      collection = collection:ToData(),
      query = query, 
      cb_index = index,
      serverId = serverId,
    };
    setmetatable(o, self);
    return o;
end

State Machine

and commint in state machine looks like below:

--[[
 * Commit the log data at the {@code logIndex}
 * @param logIndex the log index in the logStore
 * @param data 
 ]]--
function RaftTableDB:commit(logIndex, data)
    -- data is logEntry.value
    local raftLogEntryValue = RaftLogEntryValue:fromBytes(data);
    local cbFunc = function(err, data)
        local msg = {
            err = err,
            data = data,
            cb_index = raftLogEntryValue.cb_index,
        }
        -- send Response
        RTDBRequestRPC(nil, raftLogEntryValue.serverId, msg)
    end;
    collection[raftLogEntryValue.query_type](collection, raftLogEntryValue.query.query,
            raftLogEntryValue.query.update or raftLogEntryValue.query.replacement,
            cbFunc);
    self.commitIndex = logIndex;
end

Interface

the client interface keeps unchanged, we provide a script/TableDB/RaftSqliteStore.lua, This also need to add StorageProvider:SetStorageClass(raftSqliteStore) method to StorageProvider. RaftSqliteStore will send the Log Entry above to the raft cluster in each interface and could also consider consistency levels.

Callbacks

Like the original interfaces, callbacks is also implemented in a async way. But we make the connect to be sync to alleviate the effect.

Snapshot

Like rqlite, we use sqlite's Online Backup API to make snapshot.

Implementation

Raft core logic is in RaftServer. TableDB is implemented as Raft Statemachine, see RaftTableDBStateMachine. below is Figure1 in the raft thesis. It tells the process of a Raft replicated statemachine architechture.

replicated_statemachine_architechture

Source Code achitechture

Server side

TableDBApp:App -> create RpcListener && TableDB StateMachine -> set RaftParameters -> create RaftContext -> RaftConsensus.run
	RaftConsensus.run -> create RaftServer -> start TableDB StateMachine -> RpcListener:startListening
		RaftServer: has several PeerServers, and ping heartbeat at small random interval when it is the leader
		RpcListener: at Raft level, route message to RaftServer
		TableDB StateMachine: TableDB level, call messageSender to send message to RaftServer

Client side

TableDBApp:App -> create RaftSqliteStore && TableDB StateMachine -> create RaftClient -> RaftSqliteStore:setRaftClient -> send commands to the cluster
	commands: appendEntries, addServer, removeServer.
	send commands to the clusterwill retry
	RaftSqliteStore: use RaftClient send various commands to Cluster and handle response.
	TableDB StateMachine: send response to the client when the command is committed.

Usage

below is the structure of a server directory

.
├── assets.log
├── cluster.json
├── config.properties
├── log.txt
├── server.state
├── snapshot
│   ├── 12-1_User_s.db
│   ├── 12.cnf
│   ├── 18-1_User_s.db
│   ├── 18.cnf
│   ├── 24-1_User_s.db
│   ├── 24.cnf
│   ├── 6-1_User_s.db
│   └── 6.cnf
├── store.data
├── store.data.bak
├── store.idx
├── store.idx.bak
├── store.sti
├── store.sti.bak
└── temp
    └── test_raft_database
        ├── User.db
        ├── User.db-shm
        └── User.db-wal
  • assets.log and log.txt are NPL's log files.
  • cluster.json is the cluster servers's information, see below.
  • config.properties is this server's id.
  • server.state stores this server's Raft state, which is term, commitIndex, votedFor.
  • snapshot is the directory contains the snapshot of the statemachine.
    • each snapshot contains 2 files.
    • $logIndex-$term_$collection_s.db is the snapshot data of the statemachine.
    • $logIndex.cnf is the cluster configure at the logIndex.
  • store.data stores this server's Raft log entries data, and store.data.bak is its backup.
  • store.idx stores the index data of store.data, index data is the file position of log entries in store.data. store.idx.bak is its backup.
  • store.sti stores this server's log start index. and store.sti.bak is its backup.
  • temp/test_raft_database is the rootFolder of TableDatabase.

Note: we may move some state above into sqlite like actordb.

Config

To use TableDB Raft, we need to add a tabledb.config.xml file in the Tabledb rootFolder.

Note npl_package/main should not have the rootFolder, otherwise it will affect the application's rootFolder and its config. AND also care about the Development directory and WorkingDir.

<tabledb>
	<providers>
		<provider type="TableDB.RaftSqliteStore" name="raft" file="(g1)npl_mod/TableDB/RaftSqliteStore.lua">./,localhost,9004,4,rtdb
		</provider>
	</providers>
	<tables>
		<table provider="raft" name="User1"/>
		<table provider="raft" name="User2"/>
		<table provider="raft" name="default" />
	</tables>
</tabledb>

Above is an example of the Raft provider config.

  • one provider has 4 configs,
    • type is used in commonlib.gettable(provider.attr.type).
    • file is used in NPL.load(provider.attr.file).
    • name is the name of the provider, which is used in the table config.
    • values is the init args used in provider type's init method.
  • table have 2 config,
    • provider is the name of the provider, correspond to the name in provider,
    • name is the table name. If the name is default, the corresponding provider will be set to be default provider of the database. If default is not set, default provider will be sqlite.

There are 4 values(args) in the above for raft init and separated by ,.

  • ./ is the working directory for the node. The working directory should contain clustor.json file below.
  • localhost is the hostname of the node.
  • 9004 is port number for the node.
  • rtdb is the thread name running the RaftServer.

At each server's start up stage, they should know all their peers, this is a json config file like below (setup/init-cluster.json or the cluster.json in the node's working directory):

{
	"logIndex": 0,
	"lastLogIndex": 0,
	"servers":[
	   {
		   "id": 1,
		   "endpoint": "tcp://localhost:9001"
	   },
	   {
		   "id": 2,
		   "endpoint": "tcp://localhost:9002"
	   },
	   {
		   "id": 3,
		   "endpoint": "tcp://localhost:9003"
	   }
	]
}

there are several scripts (setup.bat, addsrv.bat, stopNPL.bat) to facillitate the deployment. see setup folder

Network/Communication

The Communication use npl_mod/Raft/Rpc.lua, include RaftRequestRPC and RTDBRequestRPC. RaftRequestRPC is used in Raft level and RTDBRequestRPC in TableDB level. They are both used in a Full Duplex way, that is, they are not only used to send request but also recv response.

  • For RaftRequestRPC, see RpcListener:startListening and PeerServer:SendRequest.
  • For RTDBRequestRPC, see RaftTableDBStateMachine:start, RaftClient:tryCurrentLeader and RaftTableDBStateMachine:start2.

Snapshot

  • Snapshot is made for every collection in TableDB, that is, one collection one snapshot. And each collection can have multiple snapshots at different log index. The snapshot is not deleted at the moment(should we?).
  • Snapshot is created in a different thread, see RaftTableDBStateMachine:createSnapshot. If one Snapshot is failed to create, we try to copy the prev snapshot to current log index.

Phase 2

Phase 2 will mainly boost the throughput performance with the following way:

  1. use sqlite's WAL log as the Raft log.(mainly)
  2. compress the log.

Survey/Research

  1. actordb use 2 engines (sqlite and lmdb) which are connected by WAL. we could employ this, and should in a progressive way.
  2. LMDB is fast. See Reference 1. and that's(30 Million per node) enough in our situation.

On smaller runs (30Million or less), LMDB came out on top on most of the metrics except for disk size. This is actually what we’d expect for B-trees: they’re faster the fewer keys you have in them.

  1. actordb will change the architecture in the next version (other than sqlite and lmdb), this may be a clue about the new achitecture.
  2. either way need to change the current raft implementation.
  3. snapshot handle of the statemachine is not handled in actoredb. we could leave it as the previous.

Plan/Roadmap

a more practical roadmap:

  1. hack sqlite wal to get the wal data(may be frame) as the raft log.
  2. reorganize the Raft implementation to adapt to the wal, include deal with the sqlite checkpoint.
  3. test the throuput and fix.

Result

  1. InsertThroughputNoIndex: a million non-indexed insert operation with 1000 concurrent jobs takes 2167.81s, while non-raft implementation takes 150.346s, both with paraengineclient_d.exe, best effort with NPL.

Reference

  1. benchmarking-leveldb-vs-rocksdb-vs-hyperleveldb-vs-lmdb-performance, more results
  2. actordb-how-and-why-we-run-sqlite-on-top-of-lmdb
  3. actordb-overview
  4. Lightning_Memory-Mapped_Database