Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Fix blocking in several KV store implementations #36

Merged
merged 1 commit into from
Aug 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ dependencyManagement {
entry 'kotlin-stdlib'
entry 'kotlin-stdlib-jdk8'
}
dependency('org.jetbrains.kotlinx:kotlinx-coroutines-core:0.24.0')
dependencySet(group: 'org.jetbrains.kotlinx', version: '0.24.0') {
entry 'kotlinx-coroutines-core'
entry 'kotlinx-coroutines-jdk8'
}
dependencySet(group: 'org.jetbrains.spek', version: '1.1.5') {
entry 'spek-api'
entry 'spek-junit-platform-engine'
Expand Down
1 change: 1 addition & 0 deletions kv/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ description = 'Key value store implementations.'
dependencies {
compile project(':bytes')
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
compileOnly project(':concurrent')
compileOnly 'io.lettuce:lettuce-core'
compileOnly 'org.fusesource.leveldbjni:leveldbjni-all'
Expand Down
11 changes: 7 additions & 4 deletions kv/src/main/kotlin/net/consensys/cava/kv/LevelDBKeyValueStore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package net.consensys.cava.kv

import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.newFixedThreadPoolContext
import kotlinx.coroutines.experimental.withContext
import net.consensys.cava.bytes.Bytes
import org.fusesource.leveldbjni.JniDBFactory
Expand All @@ -21,6 +21,7 @@ import org.iq80.leveldb.Options
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import kotlin.coroutines.experimental.CoroutineContext

/**
* A key-value store backed by LevelDB.
Expand All @@ -30,7 +31,9 @@ class LevelDBKeyValueStore
@JvmOverloads
constructor(
databasePath: Path,
options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong())
options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
// TODO: replace with IO context when https://github.com/Kotlin/kotlinx.coroutines/issues/79 is resolved
private val context: CoroutineContext = newFixedThreadPoolContext(4, "LevelDBKeyValueStore")
) : KeyValueStore {

private val db: DB
Expand All @@ -40,7 +43,7 @@ constructor(
db = JniDBFactory.factory.open(databasePath.toFile(), options)
}

override suspend fun get(key: Bytes): Bytes? = withContext(Unconfined) {
override suspend fun get(key: Bytes): Bytes? = withContext(context) {
val rawValue = db[key.toArrayUnsafe()]
if (rawValue == null) {
null
Expand All @@ -49,7 +52,7 @@ constructor(
}
}

override suspend fun put(key: Bytes, value: Bytes) = withContext(Unconfined) {
override suspend fun put(key: Bytes, value: Bytes) = withContext(context) {
db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
}

Expand Down
13 changes: 9 additions & 4 deletions kv/src/main/kotlin/net/consensys/cava/kv/MapDBKeyValueStore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package net.consensys.cava.kv

import kotlinx.coroutines.experimental.newFixedThreadPoolContext
import kotlinx.coroutines.experimental.withContext
import net.consensys.cava.bytes.Bytes
import org.mapdb.DB
import org.mapdb.DBMaker
Expand All @@ -21,14 +23,17 @@ import org.mapdb.HTreeMap
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import kotlin.coroutines.experimental.CoroutineContext

/**
* A key-value store backed by a MapDB instance.
*/
class MapDBKeyValueStore
@Throws(IOException::class)
constructor(
databasePath: Path
databasePath: Path,
// TODO: replace with IO context when https://github.com/Kotlin/kotlinx.coroutines/issues/79 is resolved
private val context: CoroutineContext = newFixedThreadPoolContext(4, "MapDBKeyValueStore")
) : KeyValueStore {

private val db: DB
Expand All @@ -40,11 +45,11 @@ constructor(
storageData = db.hashMap("storageData", BytesSerializer(), BytesSerializer()).createOrOpen()
}

override suspend fun get(key: Bytes): Bytes? {
return storageData[key]
override suspend fun get(key: Bytes): Bytes? = withContext(context) {
storageData[key]
}

override suspend fun put(key: Bytes, value: Bytes) {
override suspend fun put(key: Bytes, value: Bytes) = withContext(context) {
storageData[key] = value
db.commit()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import io.lettuce.core.RedisURI
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.codec.RedisCodec
import kotlinx.coroutines.experimental.future.await
import net.consensys.cava.bytes.Bytes
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent.CompletionStage

class RedisKeyValueStore(uri: String)
: KeyValueStore {
Expand All @@ -39,12 +41,11 @@ class RedisKeyValueStore(uri: String)
asyncCommands = conn.async()
}

override suspend fun get(key: Bytes): Bytes? {
return asyncCommands.get(key).get()
}
override suspend fun get(key: Bytes): Bytes? = asyncCommands.get(key).await()

override suspend fun put(key: Bytes, value: Bytes) {
asyncCommands.set(key, value).get()
val future: CompletionStage<String> = asyncCommands.set(key, value)
future.await()
}

override fun close() {
Expand Down