Skip to content
Alex Vigdor edited this page Aug 28, 2018 · 2 revisions

Multithreaded programming in Groovity

As a JVM-based language, Groovity allows you to make use of raw Threads and Executors which are standard building blocks for mulithreaded applications in Java. However groovity offers convenient, simple abstractions for common multithreading use cases through a combination of four special tags.

Async

The most straightforward way to execute some code on another thread is to make use of the "async" tag. This uses a special deadlock-free executor implementation under the hood that ensures application stability by NOT guaranteeing that all async calls actually wind up on another thread - it detects the types of situations that might lead to deadlock, a common multithreading issue where threads get stuck in a mutual wait, and avoids impact by occasionally forcing some async calls to execute on the calling thread.

System.out.println "Hello"
async{
  System.out.println "There"
}
System.out.println "World"

The above code for example could end up printing "Hello There World" or "Hello World There" as there is no guaranteed order of execution across threads.

The return value of async is a Java Future - calling get() on the Future will block until the operation has complete.

Async is useful by itself when you just want to fire off another thread and forget about it, but there are lots of use cases where you might want to do something with the result of an asynchronous call. Groovity supports two approaches to dealing with async results - "await" allows one thread to wait for the results from one or more other async threads, and "offer/accept" allow async message passing using channels.

Accept and Offer

Every invocation of the accept tag creates a new asynchronous message queue in the JVM. The body of the accept tag becomes the message consumer; it will be called 0 or more times depending on how many messages are delivered on the queue before it is closed. A queue may be closed by a publisher offering "null" or an exception to the queue, or calling close() directly on the queue, or by the consumer calling "halt" to immediately cease processing and discard remaining queued messages, or close() to initiate an orderly close where all remaining enqueued messages are still processed.

There are two distinct logical uses of accept; when used with the "channel" attribute, the queue becomes attached to a named channel topic, and will receive all messages sent to that channel name. This model supports use cases such as service registration and multiple consumers. When used without a "channel", an accept queue is a transient, anonymous queue that only be offered to by direct reference; this is most typically used to create a callback channel that can be passed in a message to a service. Here is an example showing both styles in combination. First, we will create a long-lived named channel consumer that produces the bytes of any URL passed in, and offers it back to an anonymous callback channel. Notice the use of static start method to open the channel; channels opened in static start or init methods will automatically be closed when the script is destroyed, so they are automatically cleaned up properly after a recompile.

static start(){
	accept(channel:'byteFinder'){ req ->
		req.callback.offer{ req.url.bytes }
	}
}

Now we can create a client of the byteFinder. To pass a message to the byteFinder we use the offer tag, but in our message payload we create an anonymous callback queue using accept, so we can do something with the results - in this case printing out the size and amount of time required to retrieve the URL's payload.

def addr = new URL("http://abc.go.com")
long time1 = System.currentTimeMillis()

offer(channel:'byteFinder'){
	[
		url: addr,
		callback: accept{ bytes ->
			long time2 = System.currentTimeMillis()
			log(info:"Got ${bytes.length} bytes for ${addr} after ${time2-time1}")
		}
	]
}

Note that message consumers are always executed in a sequential, single-threaded manner, so that message handlers do not have to be concerned with thread safety. To increase parallelism and throughput for expensive handlers, one can use async within the accept block to allow multiple messages to be processed at once.

accept(channel:'byteFinder'){ req ->
  async{
	   req.callback.offer{ req.url.bytes }
  }
}

Anonymous callback channels are eligible for garbage collection if discarded, but it is best practice to close them reliably in case there is another thread waiting for that callback channel to close. This can be done using a traditional try/finally block:

accept(channel:'byteFinder'){ req ->
  async{
    try{
	    req.callback.offer{ req.url.bytes }
    }
    finally{
      req.callback.close()
    }
  }
}

However there is a convenient close method on channels that takes a closure to be executed before closing, and this also makes sure to communicate any runtime errors to the channel, which might get lost using just try/finally:

accept(channel:'byteFinder'){ req ->
  async{
    req.callback.close{
	    req.callback.offer{ req.url.bytes }
    }
  }
}

By default message queues are unbounded in length, but you can configure a limit to the queue size along with a policy for what to do when the queue fills up. The default policy blocks the offering thread until a spot in the queue opens up - the offer may come with a timeout to allow offering threads control over how long they are willing to wait to queue messages. As an alternative a queue can declare a drop or evict policy to either drop new messages or the oldest queued messages respectively.

accept(q:10,policy:'evict'){...}

Accept tags may also provide a "completed" function which is called upon channel close, to provide an opportunity to perform any cleanup or final data transformation. Here is an example accept that stops computing once it has reached a certain threshold. Notice that the last return value of the handler is automatically passed into the completed closure.

def total = 0;
def last = 0;
accept(
	channel:'widgets',
	completed:{
		log(info:"Reached ${it} at ${last}")
	}
){
	last = it;
	total+= it
	if(total > 100){
		halt()
	}
	total
}


(0..100).each{
	offer(channel:'widgets', value:it)
}

Await

Await is used to block a thread pending the result of any encapsulated async tags AND/OR the closing of any encapsulated accept() queues. Await collects the return values of any async futures and accept queues and returns them in a list in the order they were declared. Await also can be used to capture streaming output from async and accept calls and replay them in sequence. Here is a sample program that leverages both capabilities: first it fires parallel requests to search google and bing and await both to return, then it uses a channel to call all the search result URLs to get the original title and content length for display.

//Step 1: set up a service channel that takes a url and calls back with a map including title and size
def urlRetriever = accept{  msg ->
	async{
		msg.callback.close{
			http(url:msg.url,cookies:'off'){
				handler{
					msg.callback.offer{
						def text = httpResponse.entity.content.text;
						def matcher = text =~ /(?s)(?:<title[^>]*>)(.*?)(?:<\/title>)/
						def title = matcher.findAll().collect{ it[1] }.first()
						[
							link:msg.url,
							title:title,
							size:text.length()
						]
					}
				}
			}
		}
	}
}

//Step 2: await parallel search requests to scrape google and bing
def searchResults = await{
	async{
		http(url:"https://www.google.com/search?q=groovy&client=safari"){
			header(name:'User-Agent',value:'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A')
			handler{
				(httpResponse.entity.content.text =~ /(?:<h3.*?<a.*?href=")([^\/].*?)(".*?<\/h3>)/).findAll().collect{ it[1] }
			}
		}
	}
	async{
		http(url:"http://www.bing.com/search?q=groovy"){
			handler{
				(httpResponse.entity.content.text =~ /(?:<h2>\s*?<a href=")([^\/].*?)("[^>]*>)/).findAll().collect{ it[1] }
			}
		}
	}
}

//Step 3: set up a result formatting template
def resultTemplate(result){
	<~
		<a href="${result.link}">${result.title}</a> (${result.size})
	~>
}

//Step 4: return a template that waits for all the search result URLs to be processed by the urlRetriever before formatting them
<~
<g:await>
	Google Results:
	<ul>
	<g:each var = "result" in = "${searchResults[0]}">
		<li>
		<g:offer channel="${urlRetriever}" value="[url:result,callback:accept{ stream resultTemplate(it) }]"/>
		</li>
	</g:each>
	</ul>
	Bing Results:
	<ul>
	<g:each var = "result" in = "${searchResults[1]}">
		<li>
		<g:offer channel="${urlRetriever}" value="[url:result,callback:accept{ stream resultTemplate(it) }]"/>
		</li>
	</g:each>
	</ul>
</g:await>
~>