Skip to content

Commit

Permalink
make OnMemoryTable thread-safety
Browse files Browse the repository at this point in the history
Signed-off-by: Kengo TODA <skypencil@gmail.com>
  • Loading branch information
KengoTODA committed Mar 30, 2024
1 parent 79cd6f1 commit 34f4041
Showing 1 changed file with 30 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import jp.skypencil.kosmo.backend.storage.shared.Table
import jp.skypencil.kosmo.backend.value.Row
import jp.skypencil.kosmo.backend.value.RowId
import jp.skypencil.kosmo.backend.value.TransactionId
import java.util.Collections.synchronizedMap
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

class OnMemoryTable(private val name: String, private val transactionManager: TransactionManager) : Table {
private val map = synchronizedMap(mutableMapOf<RowId, MutableMap<TransactionId, Row>>())
private val lock = Mutex()
private val map = mutableMapOf<RowId, MutableMap<TransactionId, Row>>()

override fun getName(): String = name

Expand All @@ -25,44 +27,51 @@ class OnMemoryTable(private val name: String, private val transactionManager: Tr
override suspend fun find(
tx: TransactionId,
id: RowId,
): Row {
checkNotNull(map[id]) {
"$this does not contain $id"
}
return checkNotNull(snapshotAt(tx, id)) {
"$this does not contain $id"
): Row =
lock.withLock {
checkNotNull(map[id]) {
"$this does not contain $id"
}
checkNotNull(snapshotAt(tx, id)) {
"$this does not contain $id"
}
}
}

override suspend fun tableScan(tx: TransactionId): Sequence<Row> =
map.entries.mapNotNull {
snapshotAt(tx, it.key)
}.asSequence()
lock.withLock {
map.entries.mapNotNull {
snapshotAt(tx, it.key)
}.asSequence()
}

override suspend fun insert(
tx: TransactionId,
row: Row,
) {
check(map[row.id] == null) {
"$this already has $row"
lock.withLock {
check(map[row.id] == null) {
"$this already has $row"
}
map[row.id] = mutableMapOf(Pair(tx, row))
}
map[row.id] = mutableMapOf(Pair(tx, row))
}

override suspend fun delete(
tx: TransactionId,
id: RowId,
): Boolean = map.remove(id) != null
): Boolean = lock.withLock { map.remove(id) != null }

override suspend fun update(
tx: TransactionId,
row: Row,
) {
val history =
checkNotNull(map[row.id]) {
"$this does not contain ${row.id}"
}
history[tx] = row
lock.withLock {
val history =
checkNotNull(map[row.id]) {
"$this does not contain ${row.id}"
}
history[tx] = row
}
}

override fun toString() = "Table(name=$name)"
Expand Down

0 comments on commit 34f4041

Please sign in to comment.