Skip to content

Commit

Permalink
refactored executors (and fixed some problems), required by #19
Browse files Browse the repository at this point in the history
  • Loading branch information
Miha-x64 committed Mar 24, 2018
1 parent b6f6b22 commit 9fe23a4
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 85 deletions.

This file was deleted.

1 change: 1 addition & 0 deletions properties/build.gradle
Expand Up @@ -3,6 +3,7 @@ apply plugin: 'kotlin'

dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
compileOnly project(':fake-android')
testCompile group: 'junit', name: 'junit', version: '4.12'
}

Expand Down
@@ -0,0 +1,17 @@
package net.aquadc.properties.executor

import android.os.Handler
import java.util.concurrent.Executor

/**
* [Executor] implementation based on Android's [Handler].
* Will cause [NoClassDefFoundError] if called out of Android.
*/
class HandlerAsExecutor(
private val handler: Handler
) : Executor {

override fun execute(command: Runnable) =
check(handler.post(command))

}
14 changes: 14 additions & 0 deletions properties/src/main/kotlin/net/aquadc/properties/executor/fx.kt
@@ -0,0 +1,14 @@
package net.aquadc.properties.executor

import javafx.application.Platform
import java.util.concurrent.Executor

/**
* Wraps [Platform] to run JavaFX Application Thread.
*/
object FxApplicationThreadExecutor : Executor {

override fun execute(command: Runnable) =
Platform.runLater(command)

}
56 changes: 56 additions & 0 deletions properties/src/main/kotlin/net/aquadc/properties/executor/util.kt
@@ -0,0 +1,56 @@
package net.aquadc.properties.executor

import android.os.Handler
import android.os.Looper
import javafx.application.Platform
import java.util.concurrent.*


internal object ScheduledDaemonHolder {
internal val scheduledDaemon =
ScheduledThreadPoolExecutor(1, ThreadFactory { Thread(it).also { it.isDaemon = true } })
}

internal object PlatformExecutors {
private val executors = ConcurrentHashMap<Thread, Executor>(4)
private val executorFactories: Array<() -> Executor?>

init {
val facs = ArrayList<() -> Executor?>(2)

try {
Looper.myLooper() // ensure class available
facs.add {
Looper.myLooper()?.let { myLooper -> HandlerAsExecutor(Handler(myLooper)) }
}
} catch (ignored: NoClassDefFoundError) {}

try {
Platform.isFxApplicationThread() // ensure class available
facs.add {
if (Platform.isFxApplicationThread()) FxApplicationThreadExecutor else null
}
} catch (ignored: NoClassDefFoundError) {}

executorFactories = facs.toTypedArray()
}

internal fun executorForCurrentThread(): Executor {
val thread = Thread.currentThread()

val ex = executors[thread]
if (ex != null) return ex

val newEx = createForCurrentThread()

// if putIfAbsent returns non-null value, the executor was set concurrently,
// use it and throw away our then
return executors.putIfAbsent(thread, newEx) ?: newEx
}

private fun createForCurrentThread(): Executor {
executorFactories.forEach { it()?.let { return it } }
throw UnsupportedOperationException("Can't execute task on $this")
}

}
Expand Up @@ -2,10 +2,11 @@ package net.aquadc.properties.internal

import net.aquadc.properties.ChangeListener
import net.aquadc.properties.Property
import net.aquadc.properties.executor.PlatformExecutors
import net.aquadc.properties.executor.ScheduledDaemonHolder
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater

Expand Down Expand Up @@ -34,35 +35,23 @@ class ConcDebouncedProperty<out T>(
it.first
}

Pair(reallyOld, scheduled.schedule(NotifyOnCorrectThread(listeners, reallyOld, new), delay, unit))
}
}
}

/**
* Why not a lambda? This class's private modifier helps ProGuard find out that outer is not used.
* Btw, this not helps much: if `debounced` used anywhere, outer will be kept.
*/
private class NotifyOnCorrectThread<T>(
private val listeners: List<Pair<Thread, ChangeListener<T>>>,
private val old: T,
private val new: T
) : Runnable {
override fun run() {
listeners.forEach {
PlatformExecutors.executorForThread(it.first).execute { it.second(old, new) }
Pair(reallyOld,
ScheduledDaemonHolder.scheduledDaemon.schedule({
listeners.forEach {
it.first.execute { it.second(reallyOld, new) }
}
}, delay, unit)
)
}
}
}

override fun getValue(): T =
original.getValue()

private val listeners = CopyOnWriteArrayList<Pair<Thread, ChangeListener<T>>>()
private val listeners = CopyOnWriteArrayList<Pair<Executor, ChangeListener<T>>>()
override fun addChangeListener(onChange: (old: T, new: T) -> Unit) {
val thread = Thread.currentThread()
PlatformExecutors.executorForThread(thread) // ensure such executor exists
listeners.add(thread to onChange)
listeners.add(PlatformExecutors.executorForCurrentThread() to onChange)
}
override fun removeChangeListener(onChange: (old: T, new: T) -> Unit) {
listeners.firstOrNull { it.second == onChange }?.let { listeners.remove(it) }
Expand All @@ -75,8 +64,6 @@ class ConcDebouncedProperty<out T>(
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
private inline fun <T> pendingUpdater() =
pendingUpdater as AtomicReferenceFieldUpdater<ConcDebouncedProperty<T>, Pair<T, ScheduledFuture<*>>?>

internal val scheduled = ScheduledThreadPoolExecutor(1, ThreadFactory { Thread(it).also { it.isDaemon = true } })
}

}
@@ -1,6 +1,8 @@
package net.aquadc.properties.internal

import net.aquadc.properties.Property
import net.aquadc.properties.executor.PlatformExecutors
import net.aquadc.properties.executor.ScheduledDaemonHolder
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

Expand All @@ -14,7 +16,7 @@ class UnsDebouncedProperty<out T>(
private val unit: TimeUnit
) : UnsListeners<T>() {

private val executor = PlatformExecutors.executorForThread(Thread.currentThread())
private val executor = PlatformExecutors.executorForCurrentThread()
private var pending: Pair<T, ScheduledFuture<*>>? = null

init {
Expand All @@ -29,7 +31,7 @@ class UnsDebouncedProperty<out T>(
it.first
}

pending = Pair(reallyOld, ConcDebouncedProperty.scheduled.schedule({
pending = Pair(reallyOld, ScheduledDaemonHolder.scheduledDaemon.schedule({
executor.execute {
valueChanged(reallyOld, new)
}
Expand Down
@@ -1,10 +1,6 @@
package net.aquadc.properties.internal

import javafx.application.Platform
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import kotlin.collections.ArrayList


internal inline fun <T, V> AtomicReferenceFieldUpdater<T, V>.update(zis: T, update: (V) -> V) {
Expand Down Expand Up @@ -45,38 +41,3 @@ internal inline fun <reified T> Array<T>.withoutNulls(): Array<T> {

return newArray as Array<T>
}

object PlatformExecutors {
private val executors = ConcurrentHashMap<Thread, Executor>()
private val executorFactories: Array<(Thread) -> Executor?>

init {
val facs = ArrayList<(Thread) -> Executor?>(2)

try {
facs.add(Class.forName("net.aquadc.properties.android.LooperExecutorFactory").newInstance() as (Thread) -> Executor?)
} catch (ignored: ClassNotFoundException) {}

try {
Platform.isFxApplicationThread() // ensure class available
facs.add { if (Platform.isFxApplicationThread()) Executor(Platform::runLater) else null }
} catch (ignored: NoClassDefFoundError) {}

executorFactories = facs.toTypedArray()
}

internal fun executorForThread(thread: Thread): Executor {
val ex = executors[thread]
if (ex == null) {
val newEx = thread.createExecutor()
executors.putIfAbsent(thread, newEx)
}
return executors[thread]!!
}

private fun Thread.createExecutor(): Executor {
executorFactories.forEach { it(this)?.let { return it } }
throw UnsupportedOperationException("Can't schedule task on $this")
}

}

0 comments on commit 9fe23a4

Please sign in to comment.