-
Notifications
You must be signed in to change notification settings - Fork 167
/
LDBKVStore.scala
91 lines (80 loc) · 2.72 KB
/
LDBKVStore.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package scorex.db
import org.iq80.leveldb.DB
import scorex.util.ScorexLogging
import scala.util.{Failure, Success, Try}
import spire.syntax.all.cfor
/**
* A LevelDB wrapper providing a convenient non-versioned database interface.
*
* Both keys and values are var-sized byte arrays.
*/
class LDBKVStore(protected val db: DB) extends KVStoreReader with ScorexLogging {
/** Immutable empty array can be shared to avoid allocations. */
private val emptyArrayOfByteArray = Array.empty[Array[Byte]]
/**
* Update this database atomically with a batch of insertion and removal operations
*
* @param toInsertKeys - keys of key-value pairs to insert into database
* @param toInsertValues - values of key-value pairs to insert into database
* @param toRemove - keys of key-value pairs to remove from the database
* @return - error if it happens, or success status
*/
def update(toInsertKeys: Array[K], toInsertValues: Array[V], toRemove: Array[K]): Try[Unit] = {
val batch = db.createWriteBatch()
try {
require(toInsertKeys.length == toInsertValues.length)
cfor(0)(_ < toInsertKeys.length, _ + 1) { i => batch.put(toInsertKeys(i), toInsertValues(i))}
cfor(0)(_ < toRemove.length, _ + 1) { i => batch.delete(toRemove(i))}
db.write(batch)
Success(())
} catch {
case t: Throwable => Failure(t)
} finally {
batch.close()
}
}
/**
* Insert single key-value into database
* @param id - key to insert
* @param value - value to insert
* @return - Success(()) in case of successful insertion, Failure otherwise
*/
def insert(id: K, value: V): Try[Unit] = {
try {
db.put(id, value)
Success(())
} catch {
case t: Throwable => Failure(t)
}
}
/**
* `update` variant where we only insert values into this database
*/
def insert(keys: Array[K], values: Array[V]): Try[Unit] = {
update(keys, values, emptyArrayOfByteArray)
}
/**
* `update` variant where we only remove values from this database
*/
def remove(keys: Array[K]): Try[Unit] = {
update(emptyArrayOfByteArray, emptyArrayOfByteArray, keys)
}
/**
* Get last key within some range (inclusive) by used comparator.
* Could be useful for applications with sequential ids.
* The method iterates over all the keys so could be slow if there are many keys in the range.
*/
def lastKeyInRange(first: Array[Byte], last: Array[Byte]): Option[K] = {
import util.control.Breaks._
val i = db.iterator()
var res: Option[K] = None
i.seek(first)
breakable {
while (i.hasNext) {
val key = i.next().getKey
if (ByteArrayUtils.compare(key, last) <= 0) res = Some(key) else break
}
}
res
}
}