Skip to content

Commit

Permalink
Trigger-based observability, SQL #32
Browse files Browse the repository at this point in the history
  • Loading branch information
Miha-x64 committed Sep 3, 2020
1 parent f0f8dba commit 5996252
Show file tree
Hide file tree
Showing 14 changed files with 983 additions and 43 deletions.
13 changes: 13 additions & 0 deletions README.md
Expand Up @@ -397,6 +397,19 @@ session.query(
structs(projection(string, string), BindBy.Position)
)
```
* Triggers:
```kt
val listener = session.observe(
UserTable to TriggerEvent.INSERT,
UserTable to TriggerEvent.DELETE,
) { report ->
val userChanges = report.of(UserTable)
println("+" + userChanges.inserted.size)
println("-" + userChanges.removed.size)
}
listener.close() // unsubscribe
```

## HTTP

Expand Down
Expand Up @@ -235,7 +235,8 @@ inline fun <SCH : Schema<SCH>, F : FieldDef<SCH, *, *>, reified R> SCH.mapIndexe
return out as Array<R>
}

@PublishedApi internal inline fun Long.forEachBit(func: (setBitIdx: Int, bitIdx: Int) -> Unit) {
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
inline fun Long.forEachBit(func: (setBitIdx: Int, bitIdx: Int) -> Unit) {
var idx = 0
var ord = 0
var bits = this
Expand Down
13 changes: 6 additions & 7 deletions sql/src/main/kotlin/net/aquadc/persistence/sql/Table.kt
Expand Up @@ -12,7 +12,6 @@ import net.aquadc.persistence.struct.NamedLens
import net.aquadc.persistence.struct.Schema
import net.aquadc.persistence.struct.StoredLens
import net.aquadc.persistence.struct.StoredNamedLens
import net.aquadc.persistence.struct.Struct
import net.aquadc.persistence.struct.forEachIndexed
import net.aquadc.persistence.type.DataType
import net.aquadc.persistence.type.Ilk
Expand Down Expand Up @@ -40,7 +39,7 @@ private constructor(
val idColName: CharSequence,
idColType: DataType.NotNull.Simple<ID>,
val pkField: ImmutableField<SCH, ID, out DataType.NotNull.Simple<ID>>? // todo: consistent names, ID || PK
// TODO: [unique] indices https://github.com/greenrobot/greenDAO/blob/72cad8c9d5bf25d6ed3bdad493cee0aee5af8a70/greendao-api/src/main/java/org/greenrobot/greendao/annotation/Index.java
// TODO: foreign keys, [unique] indices https://github.com/greenrobot/greenDAO/blob/72cad8c9d5bf25d6ed3bdad493cee0aee5af8a70/greendao-api/src/main/java/org/greenrobot/greendao/annotation/Index.java
) {

constructor(schema: SCH, name: String, idColName: String, idColType: DataType.NotNull.Simple<ID>) :
Expand All @@ -59,7 +58,7 @@ private constructor(

@JvmSynthetic @JvmField internal var _delegates: Map<StoredLens<SCH, *, *>, SqlPropertyDelegate<SCH, ID>>? = null
@JvmSynthetic @JvmField internal var _recipe: Array<out Nesting>? = null
// @JvmSynthetic @JvmField internal var _managedColumns: Array<out StoredNamedLens<SCH, *, *>>? = null
@JvmSynthetic @JvmField internal var _managedColumns: Array<out StoredNamedLens<SCH, *, *>>? = null
@JvmSynthetic @JvmField internal var _managedColNames: Array<out CharSequence>? = null
@JvmSynthetic @JvmField internal var _managedColTypes: Array<out Ilk<*, *>>? = null
@JvmSynthetic @JvmField internal var _managedColTypeNames: Array<out SqlTypeName>? = null
Expand Down Expand Up @@ -109,6 +108,7 @@ private constructor(
val manColTNs = if (skipPkCol) columnTypeNames.subList(1, columns.size) else columnTypeNames
_managedColTypeNames = if (manColTs == manColTNs) _managedColTypes else manColTNs.array()
_managedColNames = managedColumns.mapIndexedToArray { _, it -> it.name(schema) }
_managedColumns = managedColumns

types?.remove(columns[0]) // we've unconditionally peeked it earlier and conditionally polled within embed()
types?.takeIf { it.isNotEmpty() }?.let { throw RuntimeException("Cannot consume type overrides: " +
Expand Down Expand Up @@ -281,8 +281,8 @@ private constructor(
return idxOfCol - (if (pkField == null && idxOfCol >= 0) 1 else 0)
}

/*val managedColumns: Array<out StoredNamedLens<SCH, *, *>>
get() = _managedColumns ?: _columns.value.let { _ /* unwrap lazy */ -> _managedColumns!! }*/
val managedColumns: Array<out StoredNamedLens<SCH, *, *>>
get() = _managedColumns ?: _columns.value.let { _managedColumns!! }

val managedColNames: Array<out CharSequence>
get() = _managedColNames ?: _columns.value.let { _managedColNames!! }
Expand Down Expand Up @@ -371,8 +371,7 @@ inline fun <SCH : Schema<SCH>, ID : IdBound> tableOf(schema: SCH, name: String,
inline fun <SCH : Schema<SCH>, ID : IdBound> tableOf(schema: SCH, name: String, idCol: ImmutableField<SCH, ID, out DataType.NotNull.Simple<ID>>): Table<SCH, ID> =
Table(schema, name, idCol)

// just extend Table,
// but infer type arguments instead of forcing client to specify them explicitly in object expression
// infer type arguments instead of forcing client to specify them explicitly in object expression
// (he-he, modern Java allows writing `new Table<>() {}`):

inline fun <SCH : Schema<SCH>, ID : IdBound> tableOf(
Expand Down
Expand Up @@ -146,12 +146,6 @@ fun <SCH : Schema<SCH>, D> gridCallback(cb: (List<Struct<S>>) -> Unit): FetchStr
fun <T> CoroutineScope.lazyAsyncValue(): FetchValue<BlockingSession, T, Deferred<T>, AsyncList<T>> = TODO()
fun <SCH : Schema<SCH>, D> CoroutineScope.lazyAsyncStruct(): FetchStruct<BlockingSession, SCH, Nothing, D, AsyncStruct<S>, AsyncList<AsyncStruct<S>>> = TODO()
class ListChanges<SCH : Schema<SCH>, ID : IdBound>(
val oldIds: List<ID>, // List could wrap IntArray, for example. Array can't
val newIds: List<ID>,
val changes: Map<ID, FldSet<SCH>>
)
interface AsyncStruct<SCH : Schema<SCH>>
interface AsyncIterator<out T> {
Expand Down
182 changes: 176 additions & 6 deletions sql/src/main/kotlin/net/aquadc/persistence/sql/blocking/JdbcSession.kt
@@ -1,5 +1,9 @@
package net.aquadc.persistence.sql.blocking

import net.aquadc.collections.InlineEnumSet
import net.aquadc.collections.forEach
import net.aquadc.collections.toArr
import net.aquadc.persistence.NullSchema
import net.aquadc.persistence.array
import net.aquadc.persistence.fatAsList
import net.aquadc.persistence.fatMapTo
Expand All @@ -14,22 +18,32 @@ import net.aquadc.persistence.sql.Session
import net.aquadc.persistence.sql.Table
import net.aquadc.persistence.sql.Transaction
import net.aquadc.persistence.sql.FuncN
import net.aquadc.persistence.sql.ListChanges
import net.aquadc.persistence.sql.SqlTypeName
import net.aquadc.persistence.sql.TriggerEvent
import net.aquadc.persistence.sql.TriggerReport
import net.aquadc.persistence.sql.TriggerSubject
import net.aquadc.persistence.sql.Triggerz
import net.aquadc.persistence.sql.WhereCondition
import net.aquadc.persistence.sql.appendJoining
import net.aquadc.persistence.sql.bindInsertionParams
import net.aquadc.persistence.sql.bindQueryParams
import net.aquadc.persistence.sql.bindValues
import net.aquadc.persistence.sql.dialect.Dialect
import net.aquadc.persistence.sql.dialect.foldArrayType
import net.aquadc.persistence.sql.mapIndexedToArray
import net.aquadc.persistence.sql.noOrder
import net.aquadc.persistence.sql.wordCountForCols
import net.aquadc.persistence.struct.Schema
import net.aquadc.persistence.struct.Struct
import net.aquadc.persistence.type.AnyCollection
import net.aquadc.persistence.type.DataType
import net.aquadc.persistence.type.Ilk
import net.aquadc.persistence.type.i32
import net.aquadc.persistence.type.i64
import net.aquadc.persistence.type.serialized
import org.intellij.lang.annotations.Language
import java.io.Closeable
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
Expand All @@ -45,12 +59,15 @@ import kotlin.concurrent.getOrSet
@ExperimentalSql
class JdbcSession(
@JvmField @JvmSynthetic internal val connection: Connection,
@JvmField @JvmSynthetic internal val dialect: Dialect
@JvmField @JvmSynthetic internal val dialect: Dialect,
/**
* If your application is distributed, pass something identifying current node
* to avoid temp table or trigger name clashes.
*/
nodeName: String = // “-” does not seem to be a good identifier, let's be alphanumeric
java.lang.Double.doubleToLongBits(Math.random()).toString(36).replace('-', 'm')
) : Session<Blocking<ResultSet>> {

init {
connection.autoCommit = false
}
val changesPostfix = '_' + nodeName + "_changes"

@JvmField @JvmSynthetic internal val lock = ReentrantReadWriteLock()

Expand Down Expand Up @@ -138,11 +155,77 @@ class JdbcSession(
this@JdbcSession.transaction = null

if (successful) {
// old ORM-ish API
transaction.deliverChanges()
}
} finally {
lock.writeLock().unlock()
}

// new SQL-friendly API
if (successful) {
deliverTriggeredChanges()
}
}

// copy-paste, keep in sync with SQLite session
private fun deliverTriggeredChanges() {
val activeSubjects = triggers.activeSubjects()
if (activeSubjects.isEmpty()) return
val tableToChanges = HashMap<Table<*, *>, ListChanges<*, *>>()
connection.autoCommit = false
val stmt = connection.createStatement()
try {
val sb = StringBuilder()
activeSubjects.forEach { table ->
sb.setLength(0)
val rs = stmt.executeQuery(
with(dialect) {
sb.append("SELECT * FROM").append(' ').appendName(table.name + changesPostfix).toString()
}
)

val wordCount = wordCountForCols(table.managedColumns.size)
val inserted = HashSet<IdBound>()
val updated = HashMap<IdBound, Int>()
val removed = HashSet<IdBound>()
val changes = ArrayList<Long>() // OMG

val pkType = table.idColType
while (rs.next()) {
val pk = pkType.get(rs, 0)
when (val what = rs.getInt(1 + 1)) {
-1 -> removed.add(pk)
0 -> {
repeat(wordCount) { word -> changes.add(rs.getLong(2 + 1 + word)) }
check(updated.put(pk, changes.size / wordCount - 1) == null)
}
1 -> inserted.add(pk)
else -> error(what.toString())
}
}
check(tableToChanges.put(
table,
ListChanges(inserted, updated, removed, table as Table<NullSchema, IdBound>, changes.toLongArray())
) == null)

sb.setLength(0)
stmt.execute(
with(dialect) {
sb.append("DELETE FROM").append(' ').appendName(table.name + changesPostfix).toString()
}
)
}
stmt.close()

triggers.enqueue(TriggerReport(tableToChanges))
connection.commit()
} catch (t: Throwable) {
connection.rollback()
throw t
}

triggers.notifyPending()
}

private fun <SCH : Schema<SCH>, ID : IdBound> select(
Expand Down Expand Up @@ -389,11 +472,94 @@ class JdbcSession(

override fun close(cursor: ResultSet) =
cursor.close()

// copy-paste, keep in sync with SQLite session
override fun addTriggers(newbies: Map<Table<*, *>, InlineEnumSet<TriggerEvent>>) {
val sb = StringBuilder()
connection.autoCommit = false
val stmt = connection.createStatement()
try {
newbies.forEach { (table, events) ->
val wordCount = wordCountForCols(table.managedColumns.size)
val typeNames = arrayOfNulls<SqlTypeName>(wordCount + 1)
typeNames[0] = i32
typeNames.fill(i64, 1)
with(dialect) {
stmt.execute(sb.createTable(
temporary = true, ifNotExists = true,
name = table.name, namePostfix = changesPostfix,
idColName = "id", idColTypeName = table.idColTypeName,
managedPk = false /* avoid AUTO_INCREMENT or serial*/,
colNames = arrayOf("what") + (0 until wordCount).map { "ch$it" },
colTypes = typeNames as Array<out SqlTypeName>
).toString())
}
events.forEach { event ->
prepareAndCreateTrigger(sb, event, table, stmt, create = true)
}
}
stmt.close()
connection.commit()
} catch (t: Throwable) {
connection.rollback()
throw t
}
}

override fun removeTriggers(victims: Map<Table<*, *>, InlineEnumSet<TriggerEvent>>) {
val sb = StringBuilder()
val stmt = connection.createStatement()
try {
victims.forEach { (table, events) ->
stmt.execute(
with(dialect) {
sb.append("DELETE FROM").append(' ').appendName(table.name + changesPostfix)
.append(' ').append("WHERE").append(' ')
.appendJoining(events, " OR ") { ev -> append("what").append('=').append(ev.balance) }
.toString()
}
)
events.forEach { event ->
prepareAndCreateTrigger(sb, event, table, stmt, create = false)
}
}
stmt.close()
connection.commit()
} catch (t: Throwable) {
connection.rollback()
throw t
}
}

@Suppress("UPPER_BOUND_VIOLATED")
private fun prepareAndCreateTrigger(
sb: StringBuilder, event: TriggerEvent, table: Table<*, *>, stmt: Statement, create: Boolean
): Unit = with(dialect) {
table as Table<Schema<*>, IdBound>

if (!create) { // drop trigger before dropping function as trigger depends on it
sb.setLength(0)
stmt.execute(sb.changesTrigger<Schema<*>, IdBound>(changesPostfix, event, table, create = false).toString())
}

sb.setLength(0)
if (sb.prepareChangesTrigger<Schema<*>, IdBound>(changesPostfix, event, table, create).isNotEmpty()) {
stmt.execute(sb.toString())
}

if (create) {
sb.setLength(0)
stmt.execute(sb.changesTrigger<Schema<*>, IdBound>(changesPostfix, event, table, create = true).toString())
}
}
}


override fun beginTransaction(): Transaction =
createTransaction(lock, lowLevel).also { transaction = it }
createTransaction(lock, lowLevel).also {
connection.autoCommit = false
transaction = it
}

// endregion transactions and modifying statements

Expand Down Expand Up @@ -429,6 +595,10 @@ class JdbcSession(
): FuncN<Any, R> =
BlockingQuery(lowLevel, query, argumentTypes, fetch)

private val triggers = Triggerz(lowLevel)
override fun observe(vararg subject: TriggerSubject, listener: (TriggerReport) -> Unit): Closeable =
triggers.addListener(subject, listener)

override fun close() {
selectStatements.get()?.values
?.forEach(PreparedStatement::close) // Oops! Other threads' statements gonna dangle until GC
Expand Down
@@ -1,5 +1,6 @@
package net.aquadc.persistence.sql.blocking

import net.aquadc.collections.InlineEnumSet
import net.aquadc.persistence.NullSchema
import net.aquadc.persistence.FuncXImpl
import net.aquadc.persistence.sql.ColCond
Expand All @@ -12,6 +13,7 @@ import net.aquadc.persistence.sql.RealTransaction
import net.aquadc.persistence.sql.Record
import net.aquadc.persistence.sql.Session
import net.aquadc.persistence.sql.Table
import net.aquadc.persistence.sql.TriggerEvent
import net.aquadc.persistence.sql.WhereCondition
import net.aquadc.persistence.struct.Lens
import net.aquadc.persistence.struct.Schema
Expand Down Expand Up @@ -77,7 +79,7 @@ internal abstract class LowLevelSession<STMT, CUR> : Blocking<CUR> {
private val localReusableCond = ThreadLocal<ColCond<NullSchema, Any?>>()

@Suppress("UNCHECKED_CAST")
internal fun <SCH : Schema<SCH>, ID : IdBound> pkCond(
fun <SCH : Schema<SCH>, ID : IdBound> pkCond(
table: Table<SCH, ID>, value: ID
): ColCond<SCH, ID> {
val condition = (localReusableCond as ThreadLocal<ColCond<SCH, ID>>).getOrSet {
Expand All @@ -88,6 +90,9 @@ internal abstract class LowLevelSession<STMT, CUR> : Blocking<CUR> {
return condition
}

abstract fun addTriggers(newbies: Map<Table<*, *>, InlineEnumSet<TriggerEvent>>)
abstract fun removeTriggers(victims: Map<Table<*, *>, InlineEnumSet<TriggerEvent>>)

}

internal fun Session<*>.createTransaction(lock: ReentrantReadWriteLock, lowLevel: LowLevelSession<*, *>): RealTransaction {
Expand Down

0 comments on commit 5996252

Please sign in to comment.