Skip to content

Commit

Permalink
working on new core
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanpopelyshev committed Jan 22, 2017
1 parent 27c502d commit 096f9d2
Show file tree
Hide file tree
Showing 12 changed files with 541 additions and 36 deletions.
35 changes: 25 additions & 10 deletions build.gradle
@@ -1,26 +1,28 @@
group 'com.github.mauricio.kotlin'
group 'com.kotlinports.pooled'
version '0.1-SNAPSHOT'

buildscript {
ext.kotlin_version = '1.1.0-dev-5744'
ext.kotlin_version = '1.1.0-beta-17'
repositories {
mavenCentral()
maven { url 'https://dl.bintray.com/kotlin/kotlin-dev' }
maven {
url 'http://repository.jetbrains.com/all'
}
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0-M3'
}
}

subprojects {
repositories {
mavenCentral()
maven { url 'https://dl.bintray.com/kotlin/kotlin-dev' }
maven {
name 'bintray'
url 'http://jcenter.bintray.com'
}
}
mavenCentral()
maven { url 'https://dl.bintray.com/kotlin/kotlin-dev' }
maven { url 'http://dl.bintray.com/kotlin/kotlin-eap-1.1' }
maven { url 'http://jcenter.bintray.com' }
}

apply plugin: 'java'
apply plugin: 'kotlin'
Expand All @@ -30,12 +32,25 @@ subprojects {
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile 'io.github.microutils:kotlin-logging:1.4'
testCompile 'org.slf4j:slf4j-jdk14:1.7.21'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.3-beta'
testCompile 'org.jetbrains.spek:spek-api:1.1.0-beta2'
testRuntime 'org.slf4j:slf4j-jdk14:1.7.21'
testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.0-beta2'
}

jar {
manifest.attributes provider: 'gradle'
}

apply plugin: 'org.junit.platform.gradle.plugin'

junitPlatform {
filters {
engines {
include 'spek'
}
}
}
}

task wrapper(type: Wrapper) {
Expand Down
3 changes: 3 additions & 0 deletions core/build.gradle
@@ -0,0 +1,3 @@
//nothing
dependencies {
}
@@ -0,0 +1,65 @@
package com.github.kotlinports.pooled.core

/**
* Raised when a pool has reached it's limit of available objects.
*
* @param message
*/
class PoolExhaustedException( message : String ) : IllegalStateException( message )

/**
* Thrown when the pool has already been closed.
*/
class PoolAlreadyTerminatedException : IllegalStateException( "This pool has already been terminated" )

interface AsyncObjectPool<T> {

/**
* Returns an object from the pool. If the pool can not create or enqueue
* requests it will raise an
* [[PoolExhaustedException]].
*
* @return object from the pool
*/

suspend fun take(): T

/**
* Returns an object taken from the pool back to it. This object will become available for another client to use.
* If the object is invalid or can not be reused for some reason
* it will raise the error that prevented this object of being added back to the pool.
* The object is then discarded from the pool.
*
* @param item
* @return
*/

suspend fun giveBack(item: T)

/**
* Closes this pool and future calls to **take** will cause the [[PoolAlreadyTerminatedException]].
*
* @return
*/

suspend fun close()

/**
* Retrieve and use an object from the pool for a single computation, returning it when the operation completes.
*
* @param block
* @return whatever inner block returns
*/

//sorry, no use here
suspend fun <A> use(block: suspend (t: T) -> A) : A {
val item = take()

try {
return block(item)
} finally {
giveBack(item)
}
}

}
@@ -0,0 +1,35 @@
package com.github.kotlinports.pooled.core

import kotlin.coroutines.CoroutineContext

interface ExecutionContext {
/**
* Kotlin Coroutine Context, contains Interceptor and marker for current thread
*/
val ktContext: CoroutineContext
}

/**
* If you pass this, it means you already allocated it somehow, set it ref count and such
*/
interface ExecutionService {
/**
* Start using the service, get or creates contexts, increase refcount for multi-purpose service
*/
fun open(): List<ExecutionContext>

/**
* Close the service, may be free contexts, decreases refcount for multi-purpose service
*/
fun close(usedContexts: List<ExecutionContext>)

/**
* sets timer on specific context
*/
fun setTimer(context: ExecutionContext, delay: Long, periodic: Boolean, handler: suspend (timerId: Long) -> Unit): Long

/**
* clears timer
*/
fun clearTimer(timerId: Long)
}
@@ -0,0 +1,59 @@
/**
* This file contents derive from from https://github.com/mauricio/postgresql-async
*/

package com.github.kotlinports.pooled.core

interface ObjectFactory<T> {
/**
*
* Creates a valid object to be used in the pool. This method can block if necessary to make sure a correctly built
* is created.
*
* @return
*/

suspend fun create() : T

/**
*
* This method should "close" and release all resources acquired by the pooled object. This object will not be used
* anymore so any cleanup necessary to remove it from memory should be made in this method. Implementors should not
* raise an exception under any circumstances, the factory should async.log and clean up the exception itself.
*
* @param item
*/

suspend fun destroy( item : T )

/**
*
* Validates that an object can still be used for it's purpose. This method should test the object to make sure
* it's still valid for clients to use. If you have a database connection, test if you are still connected, if you're
* accessing a file system, make sure you can still see and change the file.
*
* You decide how fast this method should return and what it will test, you should usually do something that's fast
* enough not to slow down the pool usage, since this call will be made whenever an object returns to the pool.
*
* @param item an object produced by this pool
* @return
*/
suspend fun validate( item : T )

/**
*
* Does a full test on the given object making sure it's still valid. Different than validate, that's called whenever
* an object is given back to the pool and should usually be fast, this method will be called when objects are
* idle to make sure they don't "timeout" or become stale in anyway.
*
* For convenience, this method defaults to call **validate** but you can implement it in a different way if you
* would like to.
*
* @param item an object produced by this pool
* @return
*/

suspend fun test( item : T )


}
@@ -0,0 +1,61 @@
package com.github.kotlinports.pooled.core

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.coroutines.Continuation

open class PartitionedAsyncObjectPool<T>(
val factory: ObjectFactory<T>,
val configuration: PoolConfiguration,
val numberOfPartitions: Int)
: AsyncObjectPool<T> {

//TODO: why is it a map? why not array?
private val pools = Array<Pair<Int, SingleThreadedAsyncObjectPool<T>>>(numberOfPartitions,
{
i ->
Pair(i, SingleThreadedAsyncObjectPool<T>(factory, partitionConfig()))
}).toMap()

private val checkouts = ConcurrentHashMap<T, SingleThreadedAsyncObjectPool<T>>()

override suspend fun take(): T {
Executors.newCachedThreadPool()
val pool = currentPool()
val item = pool.take()
checkouts.put(item, pool)
return item
}

override suspend fun giveBack(item: T) {
checkouts
.remove(item)!!
.giveBack(item)
}

override suspend fun close() {
pools.values.forEach { it.close() }
}

fun availables(): Iterable<T> = pools.values.flatMap { it.availables() }

fun inUse(): Iterable<T> = pools.values.flatMap { it.inUse() }

fun queued(): Iterable<Continuation<T>> = pools.values.flatMap { it.queued() }

protected fun isClosed() =
pools.values.find { !it.isClosed() } == null

private fun currentPool() =
pools[currentThreadAffinity()]!!

private fun currentThreadAffinity() =
(Thread.currentThread().id % numberOfPartitions).toInt()

private fun partitionConfig() =
configuration.copy(
maxObjects = configuration.maxObjects / numberOfPartitions,
maxQueueSize = configuration.maxQueueSize / numberOfPartitions
)
}
@@ -0,0 +1,23 @@
package com.github.kotlinports.pooled.core

/**
*
* Defines specific pieces of a pool's behavior.
*
* @param maxObjects how many objects this pool will hold
* @param maxIdle number of milliseconds for which the objects are going to be kept as idle (not in use by clients of the pool)
* @param maxQueueSize when there are no more objects, the pool can queue up requests to serve later then there
* are objects available, this is the maximum number of enqueued requests
* @param validationInterval pools will use this value as the timer period to validate idle objects.
*/

data class PoolConfiguration(
val maxObjects: Int,
val maxIdle: Long,
val maxQueueSize: Int,
val validationInterval: Long = 5000
) {
companion object {
val Default = PoolConfiguration(10, 4, 10)
}
}

0 comments on commit 096f9d2

Please sign in to comment.