Permalink
Browse files

Initial spike

  • Loading branch information...
0 parents commit 82b64d0368147b060fa0c1c4a086f9cc89f7c966 @chirino chirino committed Mar 9, 2010
Showing with 5,261 additions and 0 deletions.
  1. +9 −0 .gitignore
  2. +228 −0 hawttasks-scala/pom.xml
  3. +126 −0 hawttasks-scala/src/main/scala/org/fusesource/hawttasks/example/Router.scala
  4. +199 −0 hawttasks-scala/src/main/scala/org/fusesource/hawttasks/scala/RefCounted.scala
  5. +127 −0 hawttasks-scala/src/test/scala/org/fusesource/hawttasks/example/RouterTest.scala
  6. +65 −0 hawttasks/pom.xml
  7. +30 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchObject.java
  8. +37 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchOption.java
  9. +28 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchPriority.java
  10. +40 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchQueue.java
  11. +31 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchSource.java
  12. +62 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchSystem.java
  13. +41 −0 hawttasks/src/main/java/org/fusesource/hawttasks/Dispatcher.java
  14. +30 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatcherAware.java
  15. +66 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatcherConfig.java
  16. +39 −0 hawttasks/src/main/java/org/fusesource/hawttasks/DispatcherObserver.java
  17. +53 −0 hawttasks/src/main/java/org/fusesource/hawttasks/RefCounted.java
  18. +29 −0 hawttasks/src/main/java/org/fusesource/hawttasks/Retainable.java
  19. +6 −0 hawttasks/src/main/java/org/fusesource/hawttasks/Suspendable.java
  20. +384 −0 hawttasks/src/main/java/org/fusesource/hawttasks/actor/ActorProxy.java
  21. +53 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/AbstractDispatchObject.java
  22. +206 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/AbstractSerialDispatchQueue.java
  23. +80 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/BaseRefCounted.java
  24. +80 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/BaseRetainable.java
  25. +60 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/BaseSuspendable.java
  26. +58 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/DelegatingRetainable.java
  27. +46 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/QueueSupport.java
  28. +33 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/RunnableCountDownLatch.java
  29. +120 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/RunnableSupport.java
  30. +204 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/nio/NioDispatchSource.java
  31. +127 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/nio/NioSelector.java
  32. +225 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/simple/DispatcherThread.java
  33. +171 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/simple/GlobalDispatchQueue.java
  34. +108 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/simple/IntegerCounter.java
  35. +150 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/simple/SerialDispatchQueue.java
  36. +164 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/simple/SimpleDispatcher.java
  37. +37 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/simple/SimpleQueue.java
  38. +169 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/simple/ThreadDispatchQueue.java
  39. +161 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/simple/TimerThread.java
  40. +380 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/util/IntrospectionSupport.java
  41. +42 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/util/StringSupport.java
  42. +108 −0 hawttasks/src/main/java/org/fusesource/hawttasks/internal/util/TimerHeap.java
  43. +27 −0 hawttasks/src/test/ide-resources/log4j.properties
  44. +134 −0 hawttasks/src/test/java/org/fusesource/hawttasks/DispatchSystemTest.java
  45. +128 −0 hawttasks/src/test/java/org/fusesource/hawttasks/NioDispatchSoruceTest.java
  46. +35 −0 hawttasks/src/test/resources/log4j.properties
  47. +203 −0 license.txt
  48. +322 −0 pom.xml
9 .gitignore
@@ -0,0 +1,9 @@
+.classpath
+.project
+.settings
+*.iml
+*.ipr
+*.iws
+webgen/out
+webgen/webgen.cache
+target
228 hawttasks-scala/pom.xml
@@ -0,0 +1,228 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under
+ the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.fusesource.hawttasks</groupId>
+ <artifactId>hawttasks-pom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.fusesource.hawttasks</groupId>
+ <artifactId>hawttasks-scala</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <name>HawtTasks :: Scala</name>
+ <description>The HawtTasks Scala Interface</description>
+
+ <prerequisites>
+ <maven>2.0.9</maven>
+ </prerequisites>
+
+ <properties>
+
+ <junit-version>4.7</junit-version>
+ <log4j-version>1.2.14</log4j-version>
+ <scala-version>2.8.0.Beta1</scala-version>
+ <scalatest-version>1.0.1-for-scala-2.8.0.Beta1-RC7-with-test-interfaces-0.3-SNAPSHOT</scalatest-version>
+ <scala-plugin-version>2.9.1</scala-plugin-version>
+
+ <compiler.fork>false</compiler.fork>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.fusesource.hawttasks</groupId>
+ <artifactId>hawttasks</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala-version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala-version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Testing -->
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest</artifactId>
+ <version>${scalatest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j-version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+
+ <repository>
+ <id>scala-tools.org</id>
+ <name>Scala Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </repository>
+ <repository>
+ <id>scala-tools-snapshots.org</id>
+ <name>Scala Tools Maven2 Snapshot Repository</name>
+ <url>http://scala-tools.org/repo-snapshots</url>
+ </repository>
+
+ <repository>
+ <id>scala</id>
+ <name>Scala Tools</name>
+ <url>http://scala-tools.org/repo-releases/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+
+ <repository>
+ <id>snapshots.scala-tools.org</id>
+ <name>Scala-Tools Maven2 Snapshot Repository</name>
+ <url>http://scala-tools.org/repo-snapshots</url>
+ <snapshots>
+ <enabled>true</enabled>
+ <!--updatePolicy>never</updatePolicy-->
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ <releases>
+ <enabled>true</enabled>
+ <!--updatePolicy>never</updatePolicy-->
+ <checksumPolicy>fail</checksumPolicy>
+ </releases>
+ </repository>
+
+ <repository>
+ <id>nexus.snapshots.scala-tools.org</id>
+ <name>Scala-Tools Maven2 Snapshot Repository</name>
+ <url>http://nexus.scala-tools.org/content/repositories/snapshots/</url>
+ <snapshots>
+ <enabled>true</enabled>
+ <!--updatePolicy>never</updatePolicy-->
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ <releases>
+ <enabled>true</enabled>
+ <!--updatePolicy>never</updatePolicy-->
+ <checksumPolicy>fail</checksumPolicy>
+ </releases>
+ </repository>
+
+ </repositories>
+
+ <build>
+ <defaultGoal>install</defaultGoal>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+
+ <plugins>
+
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.13.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xmx1024m</jvmArg>
+ </jvmArgs>
+ <args>
+ <!-- arg>-unchecked</arg -->
+ <arg>-deprecation</arg>
+ <arg>-Xno-varargs-conversion</arg>
+ </args>
+ <scalaVersion>${scala-version}</scalaVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.4.3</version>
+ <configuration>
+ <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise scala compiler can't find stuff -->
+ <useSystemClassLoader>false</useSystemClassLoader>
+ <!--forkMode>pertest</forkMode-->
+ <childDelegation>false</childDelegation>
+ <useFile>true</useFile>
+ <failIfNoTests>false</failIfNoTests>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <!--
+ <reporting>
+ <plugins>
+
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.13.1</version>
+ <configuration>
+ <vscaladocVersion>1.1</vscaladocVersion>
+ <scalaVersion>${scala-version}</scalaVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-changes-plugin</artifactId>
+ <version>2.3</version>
+ <reportSets>
+ <reportSet>
+ <reports>
+ <report>jira-report</report>
+ </reports>
+ </reportSet>
+ </reportSets>
+ <configuration>
+ <filteringChanges>true</filteringChanges>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ -->
+
+</project>
126 hawttasks-scala/src/main/scala/org/fusesource/hawttasks/example/Router.scala
@@ -0,0 +1,126 @@
+/**
+ * Copyright (C) 2009, Progress Software Corporation and/or its
+ * subsidiaries or affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.fusesource.hawttasks.example
+
+import java.util.HashMap
+import org.fusesource.hawttasks._
+import org.fusesource.hawttasks.scala._
+
+/**
+ * Provides a non-blocking concurrent producer to consumer
+ * routing implementation.
+ *
+ * Producers create a route object for each destination
+ * they will be producing to. Once the route is
+ * connected to the router, the producer can use
+ * the route.targets list without synchronization to
+ * get the current set of consumers that are bound
+ * to the destination.
+ *
+ */
+class Router[D, T <: RefCounted](var queue:TaskQueue) extends Queued {
+
+ class DestinationNode {
+ var targets = List[T]()
+ var routes = List[Route[D,T]]()
+
+ def on_bind(x:List[T]) = {
+ targets = x ::: targets
+ routes.foreach({r=>
+ r.bind(x)
+ })
+ }
+
+ def on_unbind(x:List[T]):Boolean = {
+ targets = targets.filterNot({t=>x.contains(t)})
+ routes.foreach({r=>
+ r.unbind(x)
+ })
+ routes == Nil && targets == Nil
+ }
+
+ def on_connect(route:Route[D,T]) = {
+ routes = route :: routes
+ route.connected(targets)
+ }
+
+ def on_disconnect(route:Route[D,T]):Boolean = {
+ routes = routes.filterNot({r=> route==r})
+ route.disconnected()
+ routes == Nil && targets == Nil
+ }
+ }
+
+ private var destinations = new HashMap[D, DestinationNode]()
+
+ private def get(destination:D) = {
+ var result = destinations.get(destination)
+ if( result ==null ) {
+ result = new DestinationNode
+ destinations.put(destination, result)
+ }
+ result
+ }
+
+ def bind(destination:D, targets:List[T]) = retaining(targets) {
+ get(destination).on_bind(targets)
+ } ->: queue
+
+ def unbind(destination:D, targets:List[T]) = releasing(targets) {
+ if( get(destination).on_unbind(targets) ) {
+ destinations.remove(destination)
+ }
+ } ->: queue
+
+ def connect(route:Route[D,T]) = retaining(route) {
+ get(route.destination).on_connect(route)
+ } ->: queue
+
+ def disconnect(route:Route[D,T]) = releasing(route) {
+ get(route.destination).on_disconnect(route)
+ } ->: queue
+
+}
+
+
+class Route[D, T <: RefCounted ](val destination:D, var queue:TaskQueue) extends QueuedRefCounted {
+
+ var targets = List[T]()
+
+ def connected(targets:List[T]) = retaining(targets) {
+ this.targets = this.targets ::: targets
+ on_connected
+ } ->: queue
+
+ def bind(targets:List[T]) = retaining(targets) {
+ this.targets = this.targets ::: targets
+ } ->: queue
+
+ def unbind(targets:List[T]) = releasing(targets) {
+ this.targets = this.targets.filterNot {
+ t=>targets.contains(t)
+ }
+ } ->: queue
+
+ def disconnected() = ^ {
+ release(targets)
+ targets = Nil
+ on_disconnected
+ } ->: queue
+
+ protected def on_connected = {}
+ protected def on_disconnected = {}
+}
199 hawttasks-scala/src/main/scala/org/fusesource/hawttasks/scala/RefCounted.scala
@@ -0,0 +1,199 @@
+/**
+ * Copyright (C) 2009, Progress Software Corporation and/or its
+ * subsidiaries or affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.scala
+
+import java.util.concurrent.atomic.AtomicInteger
+import org.fusesource.hawttasks.{RefCounted, DispatchQueue, DispatchSystem}
+
+trait Service {
+ def startup() = {}
+
+ def shutdown() = {}
+}
+
+trait ServiceRetainer extends RefCounted {
+
+ protected def retainedService: Service = new Service {}
+ protected var releaseWatchers = List[Runnable]()
+
+ val retained = new AtomicInteger(1);
+ retainedService.startup
+
+ override def retain = {
+ assertRetained()
+ retained.getAndIncrement()
+ }
+
+ override def release() = {
+ assertRetained()
+ if (retained.decrementAndGet() == 0) {
+ retainedService.shutdown
+ for( onRelease <- releaseWatchers) {
+ onRelease.run
+ }
+ }
+ }
+
+ override def isReleased() = {
+ retained.get() <= 0;
+ }
+
+ protected def assertRetained() {
+ if (retained.get() <= 0) {
+ throw new IllegalStateException(format("%s: Use of object not allowed after it has been released", this.toString()));
+ }
+ }
+
+ override def addReleaseWatcher(onRelease: Runnable) {
+ releaseWatchers = onRelease :: releaseWatchers;
+ }
+
+}
+
+object TaskQueue {
+ def apply(): TaskQueue = apply(null.asInstanceOf[String])
+
+ def apply(name: String): TaskQueue =
+ apply(DispatchSystem.createSerialQueue(name))
+
+ def apply(queue: DispatchQueue): TaskQueue = new TaskQueue {
+ def apply(task: Runnable) = queue.dispatchAsync(task)
+
+ override def retain = queue.retain
+
+ override def release = queue.release
+
+ override def isReleased = queue.isReleased
+
+ override def addReleaseWatcher(onRelease:Runnable) = queue.addReleaseWatcher(onRelease)
+ }
+}
+
+trait TaskQueue extends Function1[Runnable, Unit] with RefCounted {
+ def <<(task: Runnable) = {apply(task); this}
+
+ def ->:(task: Runnable) = {apply(task); this}
+}
+
+trait Queued {
+ var queue: TaskQueue
+
+ protected def using(resource: RefCounted): (=> Unit) => Runnable = {
+ using(resource, resource) _
+ }
+
+ protected def using(resources: Seq[RefCounted]): (=> Unit) => Runnable = {
+ using(resources, resources) _
+ }
+
+ protected def retaining(resource: RefCounted): (=> Unit) => Runnable = {
+ using(resource, null) _
+ }
+
+ protected def retaining(resources: Seq[RefCounted]): (=> Unit) => Runnable = {
+ using(resources, null) _
+ }
+
+ protected def releasing(resource: RefCounted): (=> Unit) => Runnable = {
+ using(null, resource) _
+ }
+
+ protected def releasing(resources: Seq[RefCounted]): (=> Unit) => Runnable = {
+ using(null, resources) _
+ }
+
+ protected def retain(retainedResources: Seq[RefCounted]) = {
+ if (retainedResources != null) {
+ for (resource <- retainedResources) {
+ resource.retain
+ }
+ }
+ }
+
+ protected def release(releasedResources: Seq[RefCounted]) = {
+ if (releasedResources != null) {
+ for (resource <- releasedResources) {
+ resource.release
+ }
+ }
+ }
+
+ protected def ^(proc: => Unit): Runnable = new Runnable() {
+ def run() {
+ proc;
+ }
+ }
+
+ private def using(retainedResource: RefCounted, releasedResource: RefCounted)(proc: => Unit): Runnable = {
+ if (retainedResource != null) {
+ retainedResource.retain
+ }
+ new Runnable() {
+ def run = {
+ try {
+ proc;
+ } finally {
+ if (releasedResource != null) {
+ releasedResource.release
+ }
+ }
+ }
+ }
+ }
+
+ private def using(retainedResources: Seq[RefCounted], releasedResources: Seq[RefCounted])(proc: => Unit): Runnable = {
+ retain(retainedResources)
+ new Runnable() {
+ def run = {
+ try {
+ proc;
+ } finally {
+ release(releasedResources)
+ }
+ }
+ }
+ }
+
+}
+
+trait QueuedService extends Queued with Service {
+ override def startup() = {
+ queue.retain
+ }
+
+ override def shutdown() = {
+ queue << ^ {onShutdown}
+ queue.release
+ }
+
+ protected def onShutdown() = {}
+}
+
+trait QueuedRefCounted extends Queued with ServiceRetainer {
+ protected override def retainedService = new Service {
+ override def startup() = {
+ queue.retain
+ }
+
+ override def shutdown() = {
+ queue << ^ {onShutdown}
+ queue.release
+ }
+ }
+
+ protected def onShutdown() = {}
+}
127 hawttasks-scala/src/test/scala/org/fusesource/hawttasks/example/RouterTest.scala
@@ -0,0 +1,127 @@
+/**
+ * Copyright (C) 2009, Progress Software Corporation and/or its
+ * subsidiaries or affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.fusesource.hawttasks.example
+
+import org.scalatest.FunSuite
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.fusesource.hawttasks.scala._
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@RunWith(classOf[JUnitRunner])
+class RouterTest extends FunSuite {
+
+
+ test("Using a Router") {
+ useRouter
+ }
+
+ def useRouter() {
+ var latch = new CountDownLatch(3);
+ class Consumer(var queue:TaskQueue) extends QueuedRefCounted {
+ def deliver(msg:String) = queue << ^ {
+ println("Consumer got: "+msg)
+ latch.countDown
+ }
+ }
+
+ class Producer(var queue:TaskQueue) extends QueuedRefCounted {
+
+ // Route is also a reactor.. the Router object will be sending
+ // it messages for it to update it's target list.
+ val route = new Route[String,Consumer]("FOO.QUEUE", queue) {
+
+ // This event is fired once the router is
+ // has connected the route.
+ override def on_connected = {
+ send_function();
+ }
+ }
+
+ var send_function: ()=>Unit = null;
+
+ def send(msg:String) = queue << ^ {
+ route.targets.foreach(t=>{
+ t.deliver(msg)
+ })
+ }
+
+
+ override def onShutdown = {
+ route.release;
+ }
+ }
+
+ val router = new Router[String,Consumer](TaskQueue("router"))
+ val consumer = new Consumer(TaskQueue("consumer"));
+
+ router.bind("FOO.QUEUE", consumer::Nil )
+ consumer.release
+
+
+ // Producer is a reactor.. which uses a
+ // nested reactor sharing the same dispatch queue.
+ val producer = new Producer(TaskQueue("producer"))
+
+ // the following gets called once the producer
+ // is 'connected' to the router.
+ producer.send_function = { ()=>
+ producer send "message 1"
+ producer send "message 2"
+ producer send "message 3"
+ }
+
+ router.connect(producer.route)
+
+ // wait for all messages to be sent and received..
+ assert(latch.await(1, TimeUnit.SECONDS))
+
+
+ // consumer should not be released until it gets unbound.
+ assert( !consumer.isReleased )
+
+ // unbinding him should release him.
+ router.unbind("FOO.QUEUE", consumer::Nil )
+ assertWithin(2, TimeUnit.SECONDS) {
+ consumer.isReleased
+ }
+
+ // producer should not be released until it gets unbound.
+ producer.release;
+ assert( !producer.route.isReleased )
+ router.disconnect(producer.route)
+ assertWithin(2, TimeUnit.SECONDS) {
+ producer.route.isReleased
+ }
+
+
+ }
+
+ def assertWithin(timeout:Long, unit:TimeUnit)( proc: =>Boolean ) {
+ val stopAt = System.currentTimeMillis + unit.toMillis(timeout);
+ while( System.currentTimeMillis < stopAt ) {
+ if( proc ) {
+ return;
+ }
+ Thread.sleep(50);
+ }
+ assert(proc)
+ }
+}
+
65 hawttasks/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under
+ the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.fusesource.hawttasks</groupId>
+ <artifactId>hawttasks-pom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.fusesource.hawttasks</groupId>
+ <artifactId>hawttasks</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <name>HawtTasks :: Java</name>
+ <description>HawtTasks Core Java API</description>
+
+ <properties>
+ <junit-version>4.7</junit-version>
+ <asm-version>3.1</asm-version>
+ <log4j-version>1.2.14</log4j-version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>asm</groupId>
+ <artifactId>asm-tree</artifactId>
+ <version>${asm-version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
30 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchObject.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchObject extends Suspendable {
+
+ public <Context> Context getContext();
+ public <Context> void setContext(Context context);
+
+ public void setTargetQueue(DispatchQueue queue);
+ public DispatchQueue getTargetQueue();
+}
37 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchOption.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.fusesource.hawttasks;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum DispatchOption {
+ /**
+ * Updates the target queue to be the
+ * thread queue so that execution 'sticks' to caller's
+ * thread queue.
+ */
+ STICK_TO_CALLER_THREAD,
+
+ /**
+ * Used to update the target queue to be the first
+ * random thread queue that dispatches this queue.
+ */
+ STICK_TO_DISPATCH_THREAD,
+}
28 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchPriority.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.fusesource.hawttasks;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum DispatchPriority {
+ HIGH,
+ DEFAULT,
+ LOW;
+}
40 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchQueue.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks;
+
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchQueue extends DispatchObject, Executor {
+
+ public DispatchQueue createSerialQueue(String label, DispatchOption... options);
+
+ public void dispatchAsync(Runnable runnable);
+ public void dispatchSync(Runnable runnable) throws InterruptedException;
+
+ public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit);
+ public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException;
+
+ public String getLabel();
+ public Set<DispatchOption> getOptions();
+
+}
31 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchSource.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchSource extends DispatchObject {
+
+ public void cancel();
+ public boolean isCanceled();
+
+ public void setCancelHandler(Runnable cancelHandler);
+ public void setEventHandler(Runnable eventHandler);
+
+}
62 hawttasks/src/main/java/org/fusesource/hawttasks/DispatchSystem.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks;
+
+import java.nio.channels.SelectableChannel;
+
+
+/**
+ * Provides easy access to a system wide Dispatcher.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatchSystem {
+
+ final private static Dispatcher dispatcher = create();
+
+ private static Dispatcher create() {
+ Dispatcher rc = new DispatcherConfig().createDispatcher();
+ rc.resume();
+ return rc;
+ }
+
+ public static DispatchQueue getMainQueue() {
+ return dispatcher.getMainQueue();
+ }
+
+ public static DispatchQueue getGlobalQueue() {
+ return dispatcher.getGlobalQueue();
+ }
+
+ public static DispatchQueue getGlobalQueue(DispatchPriority priority) {
+ return dispatcher.getGlobalQueue(priority);
+ }
+
+ public static DispatchQueue createSerialQueue(String label, DispatchOption...options) {
+ return dispatcher.createSerialQueue(label, options);
+ }
+
+ public static void dispatchMain() {
+ dispatcher.dispatchMain();
+ }
+
+ public static DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
+ return dispatcher.createSource(channel, interestOps, queue);
+ }
+
+
+}
41 hawttasks/src/main/java/org/fusesource/hawttasks/Dispatcher.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.fusesource.hawttasks;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Dispatcher extends Suspendable {
+
+ public DispatchQueue getGlobalQueue();
+ public DispatchQueue getGlobalQueue(DispatchPriority priority);
+
+ public DispatchQueue createSerialQueue(String label, DispatchOption... options);
+
+ public DispatchQueue getMainQueue();
+ public void dispatchMain();
+
+ public DispatchQueue getCurrentQueue();
+
+ public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
+
+
+}
30 hawttasks/src/main/java/org/fusesource/hawttasks/DispatcherAware.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks;
+
+
+/**
+ * Handy interface to signal classes which would like an Dispatcher instance
+ * injected into them.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatcherAware {
+
+ public void setDispatcher(Dispatcher dispatcher);
+
+}
66 hawttasks/src/main/java/org/fusesource/hawttasks/DispatcherConfig.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks;
+
+import org.fusesource.hawttasks.internal.simple.SimpleDispatcher;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatcherConfig {
+
+ private String label="default";
+ private int threads=Runtime.getRuntime().availableProcessors();
+ private DispatcherObserver observer;
+
+ public static Dispatcher create(String name, int threads) {
+ DispatcherConfig config = new DispatcherConfig();
+ config.label=name;
+ config.threads=threads;
+ return config.createDispatcher();
+ }
+
+ public Dispatcher createDispatcher() {
+ return new SimpleDispatcher(this);
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String name) {
+ this.label = name;
+ }
+
+ public int getThreads() {
+ return threads;
+ }
+
+ public void setThreads(int threads) {
+ this.threads = threads;
+ }
+
+ public DispatcherObserver getObserver() {
+ return observer;
+ }
+
+ public void setObserver(DispatcherObserver observer) {
+ this.observer = observer;
+ }
+
+}
39 hawttasks/src/main/java/org/fusesource/hawttasks/DispatcherObserver.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.fusesource.hawttasks;
+
+import org.fusesource.hawttasks.internal.simple.DispatcherThread;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatcherObserver {
+
+ public void onThreadCreate(DispatcherThread thread);
+ public void onThreadDestroy(DispatcherThread thread);
+
+ public void onQueueCreate(DispatchQueue queue, DispatchOption...options);
+ public void onQueueDestroy(DispatchQueue queue);
+
+ public void onSourceCreate(DispatchSource source);
+ public void onSourceDestroy(DispatchSource source);
+
+ public void onDispatchRequest(DispatchQueue target, Runnable request);
+
+}
53 hawttasks/src/main/java/org/fusesource/hawttasks/RefCounted.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks;
+
+/**
+ * Implemented by objects which use a reference counted life cycle.
+ *
+ * Objects start with a ref count of one. Retaining the object increments the counter,
+ * releasing, decrements the counter. When the counter reaches zero, the object should
+ * not longer be accessed as it will release any resources it needed to perform normal
+ * processing.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface RefCounted {
+
+ /**
+ * Increments the reference counter
+ */
+ public void retain();
+
+ /**
+ * Decrements the reference counter
+ */
+ public void release();
+
+ /**
+ * @return true if the reference counter is zero
+ */
+ public boolean isReleased();
+
+ /**
+ * adds a runnable which will be exectued once this object's
+ * reference counter reaches zero.
+ * @param onRelease
+ */
+ public void addReleaseWatcher(Runnable onRelease);
+
+}
29 hawttasks/src/main/java/org/fusesource/hawttasks/Retainable.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Retainable {
+
+ public void retain();
+ public void release();
+ public void addShutdownWatcher(Runnable shutdownWatcher);
+ public boolean isShutdown();
+}
6 hawttasks/src/main/java/org/fusesource/hawttasks/Suspendable.java
@@ -0,0 +1,6 @@
+package org.fusesource.hawttasks;
+
+public interface Suspendable extends RefCounted {
+ public void suspend();
+ public void resume();
+}
384 hawttasks/src/main/java/org/fusesource/hawttasks/actor/ActorProxy.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.actor;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.fusesource.hawttasks.DispatchQueue;
+import org.objectweb.asm.ClassWriter;
+import org.objectweb.asm.FieldVisitor;
+import org.objectweb.asm.Label;
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.Type;
+
+import static org.objectweb.asm.Type.*;
+
+import static org.objectweb.asm.ClassWriter.*;
+
+/**
+ * This class creates proxy objects that allow you to easily implement the
+ * actor pattern in java using a {@link DispatchQueue}.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ActorProxy {
+
+ public static <T> T create(Class<T> interfaceClass, T target, DispatchQueue queue) throws IllegalArgumentException {
+ return create(target.getClass().getClassLoader(), interfaceClass, target, queue);
+ }
+
+ synchronized public static <T> T create(ClassLoader classLoader, Class<T> interfaceClass, T target, DispatchQueue queue) throws IllegalArgumentException {
+ Class<T> proxyClass = getProxyClass(classLoader, interfaceClass);
+ Constructor<?> constructor = proxyClass.getConstructors()[0];
+ Object rc;
+ try {
+ rc = constructor.newInstance(new Object[]{target, queue});
+ } catch (Throwable e) {
+ throw new RuntimeException("Could not create an instance of the proxy due to: "+e.getMessage(), e);
+ }
+ return proxyClass.cast(rc);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Class<T> getProxyClass(ClassLoader loader, Class<T> interfaceClass) throws IllegalArgumentException {
+ String proxyName = proxyName(interfaceClass);
+ try {
+ return (Class<T>) loader.loadClass(proxyName);
+ } catch (ClassNotFoundException e) {
+ Generator generator = new Generator(loader, interfaceClass);
+ return generator.generate();
+ }
+ }
+
+ static private String proxyName(Class<?> clazz) {
+ return clazz.getName()+"$__ACTOR_PROXY__";
+ }
+
+ private static final class Generator<T> implements Opcodes {
+
+ private static final String RUNNABLE = "java/lang/Runnable";
+ private static final String OBJECT_CLASS = "java/lang/Object";
+ private static final String DISPATCH_QUEUE = DispatchQueue.class.getName().replace('.','/');
+
+ private final ClassLoader loader;
+ private Method defineClassMethod;
+
+ private final Class<T> interfaceClass;
+ private String proxyName;
+ private String interfaceName;
+
+ private Generator(ClassLoader loader, Class<T> interfaceClass) throws RuntimeException {
+ this.loader = loader;
+ this.interfaceClass = interfaceClass;
+ this.proxyName = proxyName(interfaceClass).replace('.', '/');
+ this.interfaceName = interfaceClass.getName().replace('.','/');
+
+ try {
+ defineClassMethod = java.lang.ClassLoader.class.getDeclaredMethod("defineClass", new Class[] { String.class, byte[].class, int.class, int.class });
+ defineClassMethod.setAccessible(true);
+ } catch (Throwable e) {
+ throw new RuntimeException("Could not access the 'java.lang.ClassLoader.defineClass' method due to: "+e.getMessage(), e);
+ }
+ }
+
+ private Class<T> generate() throws IllegalArgumentException {
+
+ // Define all the runnable classes used for each method.
+ Method[] methods = interfaceClass.getMethods();
+ for (int index = 0; index < methods.length; index++) {
+ String name = runnable(index, methods[index]).replace('/', '.');
+ byte[] clazzBytes = dumpRunnable(index, methods[index]);
+ defineClass(name, clazzBytes);
+ }
+
+ String name = proxyName.replace('/', '.');
+ byte[] clazzBytes = dumpProxy(methods);
+ return defineClass(name, clazzBytes);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Class<T> defineClass(String name, byte[] classBytes) throws RuntimeException {
+ try {
+ return (Class<T>) defineClassMethod.invoke(loader, new Object[] {name, classBytes, 0, classBytes.length});
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Could not define the generated class due to: "+e.getMessage(), e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("Could not define the generated class due to: "+e.getMessage(), e);
+ }
+ }
+
+ public byte[] dumpProxy(Method[] methods) {
+ ClassWriter cw = new ClassWriter(COMPUTE_FRAMES);
+ FieldVisitor fv;
+ MethodVisitor mv;
+ Label start, end;
+
+ // example:
+ cw.visit(V1_4, ACC_PUBLIC + ACC_SUPER, proxyName, null, OBJECT_CLASS, new String[] { interfaceName });
+ {
+ // example:
+ fv = cw.visitField(ACC_PRIVATE + ACC_FINAL, "queue", sig(DISPATCH_QUEUE), null, null);
+ fv.visitEnd();
+
+ // example:
+ fv = cw.visitField(ACC_PRIVATE + ACC_FINAL, "target", sig(interfaceName), null, null);
+ fv.visitEnd();
+
+ // example: public PizzaServiceCustomProxy(IPizzaService target, DispatchQueue queue)
+ mv = cw.visitMethod(ACC_PUBLIC, "<init>", "(" + sig(interfaceName) + sig(DISPATCH_QUEUE) + ")V", null, null);
+ {
+ mv.visitCode();
+
+ // example: super();
+ start = new Label();
+ mv.visitLabel(start);
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitMethodInsn(INVOKESPECIAL, OBJECT_CLASS, "<init>", "()V");
+
+ // example: queue=queue;
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitVarInsn(ALOAD, 2);
+ mv.visitFieldInsn(PUTFIELD, proxyName, "queue", sig(DISPATCH_QUEUE));
+
+ // example: this.target=target;
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitVarInsn(ALOAD, 1);
+ mv.visitFieldInsn(PUTFIELD, proxyName, "target", sig(interfaceName));
+
+ // example: return;
+ mv.visitInsn(RETURN);
+
+ end = new Label();
+ mv.visitLabel(end);
+ mv.visitLocalVariable("this", sig(proxyName), null, start, end, 0);
+ mv.visitLocalVariable("target", sig(interfaceName), null, start, end, 1);
+ mv.visitLocalVariable("queue", sig(DISPATCH_QUEUE), null, start, end, 2);
+ mv.visitMaxs(2, 3);
+ }
+ mv.visitEnd();
+
+ for (int index = 0; index < methods.length; index++) {
+ Method method = methods[index];
+
+ Class<?>[] params = method.getParameterTypes();
+ Type[] types = Type.getArgumentTypes(method);
+
+ String methodSig = Type.getMethodDescriptor(method);
+
+ // example: public void order(final long count)
+ mv = cw.visitMethod(ACC_PUBLIC, method.getName(), methodSig, null, null);
+ {
+ mv.visitCode();
+
+ // example: queue.dispatchAsync(new OrderRunnable(target, count));
+ start = new Label();
+ mv.visitLabel(start);
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitFieldInsn(GETFIELD, proxyName, "queue", sig(DISPATCH_QUEUE));
+ mv.visitTypeInsn(NEW, runnable(index, methods[index]));
+ mv.visitInsn(DUP);
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitFieldInsn(GETFIELD, proxyName, "target", sig(interfaceName));
+
+ for (int i = 0; i < params.length; i++) {
+ mv.visitVarInsn(types[i].getOpcode(ILOAD), 1+i);
+ }
+
+ mv.visitMethodInsn(INVOKESPECIAL, runnable(index, methods[index]), "<init>", "(" + sig(interfaceName) + sig(params) +")V");
+ mv.visitMethodInsn(INVOKEINTERFACE, DISPATCH_QUEUE, "dispatchAsync", "(" + sig(RUNNABLE) + ")V");
+
+ Type returnType = Type.getType(method.getReturnType());
+ Integer returnValue = defaultConstant(returnType);
+ if( returnValue!=null ) {
+ mv.visitInsn(returnValue);
+ }
+ mv.visitInsn(returnType.getOpcode(IRETURN));
+
+ end = new Label();
+ mv.visitLabel(end);
+ mv.visitLocalVariable("this", sig(proxyName), null, start, end, 0);
+ for (int i = 0; i < params.length; i++) {
+ mv.visitLocalVariable("param"+i, sig(params[i]), null, start, end, 1+i);
+ }
+ mv.visitMaxs(0, 0);
+ }
+ mv.visitEnd();
+ }
+ }
+ cw.visitEnd();
+
+ return cw.toByteArray();
+ }
+
+ private Integer defaultConstant(Type returnType) {
+ Integer value=null;
+ switch(returnType.getSort()) {
+ case BOOLEAN:
+ case CHAR:
+ case BYTE:
+ case SHORT:
+ case INT:
+ value = ICONST_0;
+ break;
+ case Type.LONG:
+ value = LCONST_0;
+ break;
+ case Type.FLOAT:
+ value = FCONST_0;
+ break;
+ case Type.DOUBLE:
+ value = DCONST_0;
+ break;
+ case ARRAY:
+ case OBJECT:
+ value = ACONST_NULL;
+ }
+ return value;
+ }
+
+ public byte[] dumpRunnable(int index, Method method) {
+
+ ClassWriter cw = new ClassWriter(COMPUTE_FRAMES);
+ FieldVisitor fv;
+ MethodVisitor mv;
+ Label start, end;
+
+ // example: final class OrderRunnable implements Runnable
+ String runnableClassName = runnable(index, method);
+ cw.visit(V1_4, ACC_FINAL + ACC_SUPER, runnableClassName, null, OBJECT_CLASS, new String[] { RUNNABLE });
+ {
+
+ // example: private final IPizzaService target;
+ fv = cw.visitField(ACC_PRIVATE + ACC_FINAL, "target", sig(interfaceName), null, null);
+ fv.visitEnd();
+
+ // TODO.. add field for each method parameter
+ // example: private final long count;
+
+ Class<?>[] params = method.getParameterTypes();
+ Type[] types = Type.getArgumentTypes(method);
+
+ for (int i = 0; i < params.length; i++) {
+ fv = cw.visitField(ACC_PRIVATE + ACC_FINAL, "param"+i, sig(params[i]), null, null);
+ fv.visitEnd();
+ }
+
+ // example: public OrderRunnable(IPizzaService target, long count)
+ mv = cw.visitMethod(ACC_PUBLIC, "<init>", "(" + sig(interfaceName)+sig(params)+")V", null, null);
+ {
+ mv.visitCode();
+
+ // example: super();
+ start = new Label();
+ mv.visitLabel(start);
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitMethodInsn(INVOKESPECIAL, OBJECT_CLASS, "<init>", "()V");
+
+ // example: this.target = target;
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitVarInsn(ALOAD, 1);
+ mv.visitFieldInsn(PUTFIELD, runnableClassName, "target", sig(interfaceName));
+
+ // example: this.count = count;
+ for (int i = 0; i < params.length; i++) {
+
+ // TODO: figure out how to do the right loads. it varies with the type.
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitVarInsn(types[i].getOpcode(ILOAD), 2+i);
+ mv.visitFieldInsn(PUTFIELD, runnableClassName, "param"+i, sig(params[i]));
+
+ }
+
+ // example: return;
+ mv.visitInsn(RETURN);
+
+ end = new Label();
+ mv.visitLabel(end);
+ mv.visitLocalVariable("this", sig(runnableClassName), null, start, end, 0);
+ mv.visitLocalVariable("target", sig(interfaceName), null, start, end, 1);
+
+ for (int i = 0; i < params.length; i++) {
+ mv.visitLocalVariable("param"+i, sig(params[i]), null, start, end, 2+i);
+ }
+ mv.visitMaxs(0, 0);
+ }
+ mv.visitEnd();
+
+ // example: public void run()
+ mv = cw.visitMethod(ACC_PUBLIC, "run", "()V", null, null);
+ {
+ mv.visitCode();
+
+ // example: target.order(count);
+ start = new Label();
+ mv.visitLabel(start);
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitFieldInsn(GETFIELD, runnableClassName, "target", sig(interfaceName));
+
+ for (int i = 0; i < params.length; i++) {
+ mv.visitVarInsn(ALOAD, 0);
+ mv.visitFieldInsn(GETFIELD, runnableClassName, "param"+i, sig(params[i]));
+ }
+
+ String methodSig = Type.getMethodDescriptor(method);
+ mv.visitMethodInsn(INVOKEINTERFACE, interfaceName, method.getName(), methodSig);
+
+ Type returnType = Type.getType(method.getReturnType());
+ if( returnType != VOID_TYPE ) {
+ mv.visitInsn(POP);
+ }
+
+ // example: return;
+ mv.visitInsn(RETURN);
+
+ end = new Label();
+ mv.visitLabel(end);
+ mv.visitLocalVariable("this", sig(runnableClassName), null, start, end, 0);
+ mv.visitMaxs(0, 0);
+ }
+ mv.visitEnd();
+ }
+ cw.visitEnd();
+
+ return cw.toByteArray();
+ }
+
+
+ private String sig(Class<?>[] params) {
+ StringBuilder methodSig = new StringBuilder();
+ for (int i = 0; i < params.length; i++) {
+ methodSig.append(sig(params[i]));
+ }
+ return methodSig.toString();
+ }
+
+ private String sig(Class<?> clazz) {
+ return Type.getDescriptor(clazz);
+ }
+
+ private String runnable(int index, Method method) {
+ return proxyName+"$"+index+"$"+method.getName();
+ }
+
+ private String sig(String name) {
+ return "L"+name+";";
+ }
+ }
+}
53 hawttasks/src/main/java/org/fusesource/hawttasks/internal/AbstractDispatchObject.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.internal;
+
+import org.fusesource.hawttasks.DispatchObject;
+import org.fusesource.hawttasks.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AbstractDispatchObject extends BaseSuspendable implements DispatchObject {
+
+ protected volatile Object context;
+
+ protected volatile DispatchQueue targetQueue;
+
+ @SuppressWarnings("unchecked")
+ public <Context> Context getContext() {
+ assertRetained();
+ return (Context) context;
+ }
+
+ public <Context> void setContext(Context context) {
+ assertRetained();
+ this.context = context;
+ }
+
+ public void setTargetQueue(DispatchQueue targetQueue) {
+ assertRetained();
+ this.targetQueue = targetQueue;
+ }
+
+ public DispatchQueue getTargetQueue() {
+ assertRetained();
+ return this.targetQueue;
+ }
+
+}
206 hawttasks/src/main/java/org/fusesource/hawttasks/internal/AbstractSerialDispatchQueue.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.internal;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.fusesource.hawttasks.DispatchOption;
+import org.fusesource.hawttasks.DispatchQueue;
+import org.fusesource.hawttasks.internal.simple.IntegerCounter;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AbstractSerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
+
+ protected final String label;
+ protected final AtomicInteger executeCounter = new AtomicInteger();
+
+ protected final AtomicLong externalQueueSize = new AtomicLong();
+ protected final AtomicLong size = new AtomicLong();
+ protected final ConcurrentLinkedQueue<Runnable> externalQueue = new ConcurrentLinkedQueue<Runnable>();
+
+ private final LinkedList<Runnable> localQueue = new LinkedList<Runnable>();
+ private final ThreadLocal<Boolean> executing = new ThreadLocal<Boolean>();
+
+ protected final Set<DispatchOption> options;
+
+// @Override
+// public void retain() {
+// super.retain();
+// if( label!=null && label.startsWith("producer"))
+// System.out.println(format("retain: %s", this.toString()));
+// }
+//
+// @Override
+// public void release() {
+// super.release();
+// if( label!=null && label.startsWith("producer"))
+// System.out.println(format("release: %s", this.toString()));
+// }
+
+ public AbstractSerialDispatchQueue(String label, DispatchOption...options) {
+ this.label = label;
+ this.options = set(options);
+ }
+
+ static private Set<DispatchOption> set(DispatchOption[] options) {
+ if( options==null || options.length==0 )
+ return Collections.emptySet() ;
+ return Collections.unmodifiableSet(EnumSet.copyOf(Arrays.asList(options)));
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ @Override
+ protected void onStartup() {
+ dispatchSelfAsync();
+ }
+
+ @Override
+ protected void onResume() {
+ dispatchSelfAsync();
+ }
+
+ public void execute(Runnable command) {
+ assertRetained();
+ dispatchAsync(command);
+ }
+
+ public void dispatchAsync(Runnable runnable) {
+ assert runnable != null;
+ assertRetained();
+
+ long sizeWas = size.getAndIncrement();
+
+ // We can take a shortcut...
+ if( executing.get()!=null ) {
+ localQueue.add(runnable);
+ } else {
+ if( sizeWas==0 ) {
+ retain();
+ }
+
+ long lastSize = externalQueueSize.getAndIncrement();
+ externalQueue.add(runnable);
+ if( lastSize == 0 && suspended.get()<=0 ) {
+ dispatchSelfAsync();
+ }
+ }
+ }
+
+ protected void dispatchSelfAsync() {
+ targetQueue.dispatchAsync(this);
+ }
+
+ public void run() {
+ IntegerCounter limit = new IntegerCounter();
+ limit.set(1000);
+ dispatch(limit);
+ }
+
+ protected void dispatch(IntegerCounter limit) {
+ executing.set(true);
+ // Protection against concurrent execution...
+ // Many threads can try to get in.. but only the first will win..
+ if( executeCounter.getAndIncrement()==0 ) {
+ // Do additional loops for each thread that could
+ // not make it in. This protects us from exiting
+ // the dispatch loop but still just after a new
+ // thread was trying to get in.
+ do {
+ dispatchLoop(limit);
+ } while( executeCounter.decrementAndGet()>0 );
+ }
+ executing.remove();
+ }
+
+ private void dispatchLoop(IntegerCounter limit) {
+ int counter=0;
+ try {
+
+ Runnable runnable;
+ while( suspended.get() <= 0 ) {
+
+ if( (runnable = localQueue.poll())!=null ) {
+ counter++;
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ if( limit.decrementAndGet() <= 0 ) {
+ return;
+ }
+ continue;
+ }
+
+ long lsize = externalQueueSize.get();
+ if( lsize>0 ) {
+ while( lsize > 0 ) {
+ runnable = externalQueue.poll();
+ if( runnable!=null ) {
+ localQueue.add(runnable);
+ lsize = externalQueueSize.decrementAndGet();
+ }
+ }
+ continue;
+ }
+
+ break;
+ }
+
+ } finally {
+ if( counter>0 ) {
+ long lsize = size.addAndGet(-counter);
+ assert lsize >= 0;
+ if( lsize==0 ) {
+ release();
+ } else {
+ dispatchSelfAsync();
+ }
+ }
+ }
+ }
+
+ public void dispatchSync(Runnable runnable) throws InterruptedException {
+ assertRetained();
+ dispatchApply(1, runnable);
+ }
+
+ public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+ assertRetained();
+ QueueSupport.dispatchApply(this, iterations, runnable);
+ }
+
+ public Set<DispatchOption> getOptions() {
+ assertRetained();
+ return options;
+ }
+
+
+}
80 hawttasks/src/main/java/org/fusesource/hawttasks/internal/BaseRefCounted.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.internal;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.fusesource.hawttasks.RefCounted;
+
+import static java.lang.String.*;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BaseRefCounted implements RefCounted {
+
+ final protected AtomicInteger retained = new AtomicInteger(1);
+ final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>(1);
+
+ public void addReleaseWatcher(Runnable shutdownHandler) {
+ assertRetained();
+ synchronized (shutdownHandlers) {
+ shutdownHandlers.add(shutdownHandler);
+ }
+ }
+
+ public void retain() {
+ assertRetained();
+ retained.getAndIncrement();
+ }
+
+ public void release() {
+ assertRetained();
+ if (retained.decrementAndGet() == 0) {
+ onShutdown();
+ }
+ }
+
+ final protected void assertRetained() {
+ if( retained.get() <= 0 ) {
+// System.out.println(format("!!!!!!!! %s: Use of object not allowed after it has been released", this.toString()));
+ throw new IllegalStateException(format("%s: Use of object not allowed after it has been released", this.toString()));
+ }
+// assert retained.get() > 0 : format("%s: Use of object not allowed after it has been released", this.toString());
+ }
+
+ public boolean isReleased() {
+ return retained.get() <= 0;
+ }
+
+ /**
+ * Subclasses should override if they want to do clean up.
+ */
+ protected void onShutdown() {
+ ArrayList<Runnable> copy;
+ synchronized (shutdownHandlers) {
+ copy = new ArrayList<Runnable>(shutdownHandlers);
+ shutdownHandlers.clear();
+ }
+ for (Runnable runnable : copy) {
+ runnable.run();
+ }
+ }
+
+}
80 hawttasks/src/main/java/org/fusesource/hawttasks/internal/BaseRetainable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.internal;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.fusesource.hawttasks.Retainable;
+
+import static java.lang.String.*;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BaseRetainable implements Retainable{
+
+ final protected AtomicInteger retained = new AtomicInteger(1);
+ final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>(1);
+
+ public void addShutdownWatcher(Runnable shutdownHandler) {
+ assertRetained();
+ synchronized (shutdownHandlers) {
+ shutdownHandlers.add(shutdownHandler);
+ }
+ }
+
+ public void retain() {
+ assertRetained();
+ retained.getAndIncrement();
+ }
+
+ public void release() {
+ assertRetained();
+ if (retained.decrementAndGet() == 0) {
+ onShutdown();
+ }
+ }
+
+ final protected void assertRetained() {
+ if( retained.get() <= 0 ) {
+// System.out.println(format("!!!!!!!! %s: Use of object not allowed after it has been released", this.toString()));
+ throw new IllegalStateException(format("%s: Use of object not allowed after it has been released", this.toString()));
+ }
+// assert retained.get() > 0 : format("%s: Use of object not allowed after it has been released", this.toString());
+ }
+
+ public boolean isShutdown() {
+ return retained.get() <= 0;
+ }
+
+ /**
+ * Subclasses should override if they want to do clean up.
+ */
+ protected void onShutdown() {
+ ArrayList<Runnable> copy;
+ synchronized (shutdownHandlers) {
+ copy = new ArrayList<Runnable>(shutdownHandlers);
+ shutdownHandlers.clear();
+ }
+ for (Runnable runnable : copy) {
+ runnable.run();
+ }
+ }
+
+}
60 hawttasks/src/main/java/org/fusesource/hawttasks/internal/BaseSuspendable.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.internal;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.fusesource.hawttasks.Suspendable;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BaseSuspendable extends BaseRefCounted implements Suspendable {
+
+ protected final AtomicBoolean startup = new AtomicBoolean(true);
+ protected final AtomicInteger suspended = new AtomicInteger();
+
+ public void resume() {
+ assertRetained();
+ if (suspended.decrementAndGet() == 0) {
+ if (startup.compareAndSet(true, false)) {
+ onStartup();
+ } else {
+ onResume();
+ }
+ }
+ }
+
+ public void suspend() {
+ assertRetained();
+ if (suspended.getAndIncrement() == 0) {
+ onSuspend();
+ }
+ }
+
+ protected void onStartup() {
+ }
+
+ protected void onSuspend() {
+ }
+
+ protected void onResume() {
+ }
+
+}
58 hawttasks/src/main/java/org/fusesource/hawttasks/internal/DelegatingRetainable.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.internal;
+
+import org.fusesource.hawttasks.Retainable;
+
+import static java.lang.String.*;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DelegatingRetainable implements Retainable {
+
+ protected final Retainable retainable;
+
+ public DelegatingRetainable(Retainable retainable) {
+ this.retainable = retainable;
+ }
+
+ public void addShutdownWatcher(Runnable shutdownWatcher) {
+ retainable.addShutdownWatcher(shutdownWatcher);
+ }
+
+ public void release() {
+ retainable.release();
+ }
+
+ public void retain() {
+ retainable.retain();
+ }
+
+ public boolean isShutdown() {
+ return retainable.isShutdown();
+ }
+
+ final protected void assertRetained() {
+// if( retained.get() <= 0 ) {
+// throw new IllegalStateException(format("%s: Use of object not allowed after it has been released", this.toString()));
+// }
+ assert !isShutdown() : format("Use of object not allowed after it has been released: %s", this.toString());
+ }
+
+}
46 hawttasks/src/main/java/org/fusesource/hawttasks/internal/QueueSupport.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.internal;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.fusesource.hawttasks.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueSupport {
+
+ static public void dispatchApply(DispatchQueue queue, int itterations, final Runnable runnable) throws InterruptedException {
+ final CountDownLatch done = new CountDownLatch(itterations);
+ Runnable wrapper = new Runnable() {
+ public void run() {
+ try {
+ runnable.run();
+ } finally {
+ done.countDown();
+ }
+ }
+ };
+ for( int i=0; i < itterations; i++ ) {
+ queue.dispatchAsync(wrapper);
+ }
+ done.await();
+ }
+
+}
33 hawttasks/src/main/java/org/fusesource/hawttasks/internal/RunnableCountDownLatch.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.fusesource.hawttasks.internal;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class RunnableCountDownLatch extends CountDownLatch implements Runnable {
+ public RunnableCountDownLatch(int count) {
+ super(count);
+ }
+ public void run() {
+ countDown();
+ }
+}
120 hawttasks/src/main/java/org/fusesource/hawttasks/internal/RunnableSupport.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawttasks.internal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.fusesource.hawttasks.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class RunnableSupport {
+
+ private static Runnable NO_OP = new Runnable() {
+ public void run() {
+ }
+ public String toString() {
+ return "{}";
+ };
+ };
+
+ public static Runnable runNoop() {
+ return NO_OP;
+ }