Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace iodb with leveldb #969

Merged
merged 11 commits into from
Dec 27, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.openjdk.jmh.annotations._
import org.slf4j.LoggerFactory
import scorex.crypto.authds.avltree.batch.{Operation, PersistentBatchAVLProver, VersionedIODBAVLStorage}
import scorex.crypto.hash.{Blake2b256, Digest32}
import scorex.db.LDBVersionedStore

object AVLTreeBatchPerformance extends {

Expand All @@ -20,7 +21,8 @@ object AVLTreeBatchPerformance extends {

val logger = LoggerFactory.getLogger("TEST")
var prover: Prover = _
var store: LSMStore = _
//var store: LSMStore = _
aslesarenko marked this conversation as resolved.
Show resolved Hide resolved
var store: LDBVersionedStore = _
var storage: VersionedIODBAVLStorage[Digest32] = _
var operations: Array[Operation] = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.iohk.iodb.{FileAccess, LSMStore}
import scorex.crypto.authds.avltree.batch._
import scorex.crypto.authds.{ADKey, ADValue}
import scorex.crypto.hash.{Blake2b256, Blake2b256Unsafe, Digest32}
import scorex.db.LDBVersionedStore
import scorex.utils.Random

object Helper {
Expand Down Expand Up @@ -33,10 +34,11 @@ object Helper {
inserts ++ updates
}

def getPersistentProverWithLSMStore(keepVersions: Int, baseOperationsCount: Int = 0): (Prover, LSMStore, VersionedIODBAVLStorage[Digest32]) = {
def getPersistentProverWithLSMStore(keepVersions: Int, baseOperationsCount: Int = 0): (Prover, LDBVersionedStore, VersionedIODBAVLStorage[Digest32]) = {
val dir = java.nio.file.Files.createTempDirectory("bench_testing_" + scala.util.Random.alphanumeric.take(15)).toFile
dir.deleteOnExit()
val store = new LSMStore(dir, keepVersions = keepVersions, fileAccess = FileAccess.UNSAFE)
//val store = new LSMStore(dir, keepVersions = keepVersions, fileAccess = FileAccess.UNSAFE)
val store = new LDBVersionedStore(dir, keepVersions = keepVersions)
val storage = new VersionedIODBAVLStorage(store, NodeParameters(kl, Some(vl), ll))
require(storage.isEmpty)
val prover = new BatchAVLProver[Digest32, HF](kl, Some(vl))
Expand Down
133 changes: 133 additions & 0 deletions avldb/src/main/scala/scorex/db/LDBFactory.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package scorex.db

import java.io.File
import java.util.concurrent.locks.ReentrantReadWriteLock

import org.iq80.leveldb.{DB, Range, DBFactory, DBIterator, Options, ReadOptions, Snapshot, WriteBatch, WriteOptions}
import scorex.util.ScorexLogging

import scala.collection.mutable
import scala.util.Try

// Registry of opened LevelDB instances.
// LevelDB prohibit access to the same storage file from more than one DB instance.
// And ergo application (mostly tests) quit frequently doesn't not explicitly close
// database and tries to reopen it.
aslesarenko marked this conversation as resolved.
Show resolved Hide resolved
case class StoreRegistry(val factory : DBFactory) extends DBFactory {

val lock = new ReentrantReadWriteLock()
val map = new mutable.HashMap[File, RegisteredDB]

// Decorator of LevelDB DB class which overrides close() methods and unlinks database from registry on close.
aslesarenko marked this conversation as resolved.
Show resolved Hide resolved
case class RegisteredDB(val impl:DB, val path: File) extends DB {
var count: Int = 0
aslesarenko marked this conversation as resolved.
Show resolved Hide resolved

def get(key: Array[Byte]): Array[Byte] = impl.get(key)

def get(key: Array[Byte], options: ReadOptions): Array[Byte] = impl.get(key, options)

def iterator: DBIterator = impl.iterator

def iterator(options: ReadOptions): DBIterator = impl.iterator(options)

def put(key: Array[Byte], value: Array[Byte]) = impl.put(key, value)

def delete(key: Array[Byte]) = impl.delete(key)

def write(batch: WriteBatch) = impl.write(batch)

def write(batch: WriteBatch, options: WriteOptions) = impl.write(batch, options)

def createWriteBatch: WriteBatch = impl.createWriteBatch()

def put(key: Array[Byte], value: Array[Byte], options: WriteOptions) = impl.put(key, value, options)

def delete(key: Array[Byte], options: WriteOptions) = impl.delete(key, options)

def getSnapshot: Snapshot = impl.getSnapshot()

def getApproximateSizes(ranges: Range*): Array[Long] = impl.getApproximateSizes(ranges: _*)

def getProperty(name: String): String = impl.getProperty(name)

def suspendCompactions = impl.suspendCompactions()

def resumeCompactions = impl.resumeCompactions()

def compactRange(begin: Array[Byte], end: Array[Byte]) = impl.compactRange(begin, end)

override def close() = {
remove(path)
impl.close()
}
}

private def add(file: File, create: => DB): DB = {
lock.writeLock().lock()
try {
map.getOrElseUpdate(file, new RegisteredDB(create, file))
} finally {
lock.writeLock().unlock()
}
}

private def remove(path:File): Unit = {
lock.writeLock().lock()
try {
map.remove(path)
} finally {
lock.writeLock().unlock()
}
}

def open(path: File, options: Options): DB = {
lock.writeLock().lock()
try {
add(path, factory.open(path, options))
} finally {
lock.writeLock().unlock()
}
}

def destroy(path: File, options: Options) = {
factory.destroy(path, options)
}

def repair(path: File, options: Options) = {
factory.repair(path, options)
}
}

object LDBFactory extends ScorexLogging {

private val nativeFactory = "org.fusesource.leveldbjni.JniDBFactory"
private val javaFactory = "org.iq80.leveldb.impl.Iq80DBFactory"

lazy val factory: DBFactory = {
val loaders = List(ClassLoader.getSystemClassLoader, this.getClass.getClassLoader)
val factories = List(nativeFactory, javaFactory)
val pairs = loaders.view
.zip(factories)
.flatMap { case (loader, factoryName) =>
loadFactory(loader, factoryName).map(factoryName -> _)
}

val (name, factory) = pairs.headOption.getOrElse(
throw new RuntimeException(s"Could not load any of the factory classes: $nativeFactory, $javaFactory"))

if (name == javaFactory) {
log.warn("Using the pure java LevelDB implementation which is still experimental")
} else {
log.info(s"Loaded $name with $factory")
}
new StoreRegistry(factory)
}

private def loadFactory(loader: ClassLoader, factoryName: String): Option[DBFactory] =
try Some(loader.loadClass(factoryName).getConstructor().newInstance().asInstanceOf[DBFactory])
catch {
case e: Throwable =>
log.warn(s"Failed to load database factory $factoryName due to: $e")
None
}
}
Loading