Skip to content

Commit

Permalink
delivering transaction commits to ManagedProperties, #32
Browse files Browse the repository at this point in the history
  • Loading branch information
Miha-x64 committed Aug 4, 2018
1 parent 956ecd9 commit f38b26c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 26 deletions.
@@ -1,16 +1,18 @@
package net.aquadc.properties.internal

import android.support.annotation.RestrictTo
import net.aquadc.properties.MutableProperty
import net.aquadc.properties.Property

/**
* A property whose value can be changed inside a transaction.
*/
@PublishedApi internal class ManagedProperty<T, TOKN>(
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
class ManagedProperty<T, TOKN>(
private val manager: Manager<TOKN, T>,
private val token: TOKN,
private val id: Long
) : `Notifier+1AtomicRef`<T, T>(true, unset()), MutableProperty<T>, (@ParameterName("newValue") T) -> Unit {
) : `Notifier+1AtomicRef`<T, T>(true, unset()), MutableProperty<T> {

override var value: T
get() {
Expand Down Expand Up @@ -45,7 +47,7 @@ import net.aquadc.properties.Property
val clean = if (ref === Unset) manager.getClean(token, id) else Unset
// after mutating dirty state we won't be able to see the clean one, so preserve it

val success = manager.set(token, id, expect, update, this)
val success = manager.set(token, id, expect, update)
// this changes 'dirty' state (and value returned by 'get'),
// but we don't want to deliver it until it becomes clean

Expand All @@ -54,7 +56,7 @@ import net.aquadc.properties.Property
return success
}

override fun invoke(newValue: T) {
fun commit(newValue: T) {
if (newValue !== Unset) {
// the transaction was committed

Expand Down Expand Up @@ -82,12 +84,7 @@ interface Manager<TOKN, T> {

/**
* Set, if [expected] === [Unset]; CAS otherwise.
* @param onTransactionEnd will be invoked after transaction end;
* param newValue: new value, if transaction was committed, or [Unset], if it was rolled back
* @return if write was successful; simple sets are always successful, even if current value is already equal to [update].
*/
fun set(token: TOKN, id: Long, expected: Any?, update: T, onTransactionEnd: (newValue: T) -> Unit): Boolean
fun set(token: TOKN, id: Long, expected: Any?, update: T): Boolean
}

inline fun <T, TOKN> newManagedProperty(manager: Manager<TOKN, T>, token: TOKN, id: Long): MutableProperty<T> =
ManagedProperty(manager, token, id)
@@ -1,12 +1,14 @@
package net.aquadc.properties.internal

import android.support.annotation.RestrictTo
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater

/**
* This is a [-Notifier] with extra [AtomicReferenceFieldUpdater].
* @param REF type of extra atomic ref
*/
internal abstract class `Notifier+1AtomicRef`<out T, REF>
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
abstract class `Notifier+1AtomicRef`<out T, REF>
internal constructor(concurrent: Boolean, initialRef: REF)
: `-Notifier`<T>(concurrent) {

Expand Down
34 changes: 22 additions & 12 deletions sql/src/main/kotlin/net/aquadc/properties/sql/jdbc-sqlite.kt
Expand Up @@ -4,9 +4,9 @@ package net.aquadc.properties.sql

import net.aquadc.properties.MutableProperty
import net.aquadc.properties.Property
import net.aquadc.properties.internal.ManagedProperty
import net.aquadc.properties.internal.Manager
import net.aquadc.properties.internal.Unset
import net.aquadc.properties.internal.newManagedProperty
import net.aquadc.properties.propertyOf
import java.sql.Connection
import java.sql.PreparedStatement
Expand Down Expand Up @@ -55,12 +55,20 @@ class JdbcSqliteSession(private val connection: Connection) : Session {
}

private fun onTransactionEnd(successful: Boolean) {
checkNotNull(transaction)
val transaction = transaction ?: throw AssertionError()
try {
if (successful) connection.commit() else connection.rollback()
transaction = null
this.transaction = null

// TODO notify
transaction.updated?.forEach { (col, pkToVal) ->
pkToVal.forEach { (pk, value) ->
@Suppress("UPPER_BOUND_VIOLATED")
fieldOfInternal<Any?, Any?, Any?>(col as Col<Any?, Any?>, pk).commit(value)
}
}
transaction.inserted?.forEach { (table, pk) ->
// todo
}
} finally {
lock.writeLock().unlock()
}
Expand Down Expand Up @@ -196,17 +204,19 @@ class JdbcSqliteSession(private val connection: Connection) : Session {
.executeQuery()
}

override fun <REC : Record<REC, ID>, ID : IdBound, T> fieldOf(col: Col<REC, T>, id: ID): MutableProperty<T> =
fieldOfInternal(col, id) // just erase type

override fun <REC : Record<REC, ID>, ID : IdBound, T> fieldOf(
table: Table<REC, ID>, col: Col<REC, T>, id: ID
): MutableProperty<T> {
fun <REC : Record<REC, ID>, ID : IdBound, T> fieldOfInternal(
col: Col<REC, T>, id: ID
): ManagedProperty<T, Col<REC, T>> {
val tableCols =
recordManager.records.getOrPut(col, ::ConcurrentHashMap) as ConcurrentHashMap<Long, MutableProperty<T>>
recordManager.records.getOrPut(col, ::ConcurrentHashMap) as ConcurrentHashMap<Long, ManagedProperty<T, Col<REC, T>>>

val localId = localId(table, id)
val localId = localId(col.table as Table<REC, ID>, id)

return tableCols.getOrPut(localId) {
newManagedProperty(recordManager as Manager<Col<REC, T>, T>, col, localId)
ManagedProperty(recordManager as Manager<Col<REC, T>, T>, col, localId)
}
}

Expand All @@ -232,7 +242,7 @@ class JdbcSqliteSession(private val connection: Connection) : Session {
* col: records
* record: fields
*/
internal val records = ConcurrentHashMap<Col<*, *>, ConcurrentHashMap<Long, MutableProperty<*>>>()
internal val records = ConcurrentHashMap<Col<*, *>, ConcurrentHashMap<Long, ManagedProperty<*, *>>>()

private val statements = ThreadLocal<HashMap<String, PreparedStatement>>()

Expand Down Expand Up @@ -260,7 +270,7 @@ class JdbcSqliteSession(private val connection: Connection) : Session {
}

@Suppress("UPPER_BOUND_VIOLATED")
override fun set(token: Col<*, *>, id: Long, expected: Any?, update: Any?, onTransactionEnd: (newValue: Any?) -> Unit): Boolean {
override fun set(token: Col<*, *>, id: Long, expected: Any?, update: Any?): Boolean {
val transaction = transaction ?: throw IllegalStateException("This can be performed only within a transaction")
getDirty(token, id).let {
if (it === Unset) {
Expand Down
6 changes: 3 additions & 3 deletions sql/src/main/kotlin/net/aquadc/properties/sql/library.kt
Expand Up @@ -17,7 +17,7 @@ interface Session {
fun <REC : Record<REC, ID>, ID : IdBound> select(table: Table<REC, ID>, condition: WhereCondition<out REC>): Property<List<REC>>
fun <REC : Record<REC, ID>, ID : IdBound> count(table: Table<REC, ID>, condition: WhereCondition<out REC>): Property<Long>

fun <REC : Record<REC, ID>, ID : IdBound, T> fieldOf(table: Table<REC, ID>, col: Col<REC, T>, id: ID): MutableProperty<T>
fun <REC : Record<REC, ID>, ID : IdBound, T> fieldOf(col: Col<REC, T>, id: ID): MutableProperty<T>
}

inline fun Session.transaction(block: (Transaction) -> Unit) {
Expand Down Expand Up @@ -141,7 +141,7 @@ abstract class Record<REC : Record<REC, ID>, ID : IdBound>(

infix fun <ForeREC : Record<ForeREC, ForeID>, ForeID : IdBound>
Col<REC, ForeID?>.toOneNullable(foreignTable: Table<ForeREC, ForeID>): MutableProperty<ForeREC?> =
session.fieldOf(this@Record.table, this, primaryKey).bind(
session.fieldOf(this, primaryKey).bind(
{ id -> if (id == null) null else session.require(foreignTable, id) },
{ it?.primaryKey }
)
Expand All @@ -151,7 +151,7 @@ abstract class Record<REC : Record<REC, ID>, ID : IdBound>(
session.select(foreignTable, this eq primaryKey)

operator fun <U> Col<REC, U>.invoke(): MutableProperty<U> =
session.fieldOf(this@Record.table, this, primaryKey)
session.fieldOf(this, primaryKey)

}

Expand Down

0 comments on commit f38b26c

Please sign in to comment.