Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master

Fetching latest commit…

Octocat-spinner-32-eaf2f5

Cannot retrieve the latest commit at this time

..
Octocat-spinner-32 AsyncNetworkIO.scala
Octocat-spinner-32 README.md
README.md

Asynchronous Network I/O with Scala Continuations

13 Aug 2011

In an earlier example, I showed how Scala's delimited continuations can be used to effect non-blocking console I/O. This idea can be extended for more practical uses, such as network communications. Continuation-based Web frameworks have been popularized by Jetty Continuations and the Servlet-3.0 suspendable requests, and are a neat way to scale up software without changing much.

Consider the following socket-based server and client.

SyncServer:

while (true) {
  val server = new java.net.ServerSocket(5554, 1)
  val socket = server.accept()
  server.close()

  val ois = new java.io.ObjectInputStream(socket.getInputStream())
  val request = ois.readObject().toString

  Thread.sleep(250)

  val response = "response(" + request + ")"
  val oos = new java.io.ObjectOutputStream(socket.getOutputStream())
  oos.writeObject(response)
  oos.close()

  ois.close()
  socket.close()
}

The server will continually listen for a single connection, then echo the incoming data back to the client before listening for the next connection. Each loop iteration is blocked for 250 ms to simulate time-consuming I/O.

Client:

val socket = new java.net.Socket(java.net.InetAddress.getLocalHost, 5554)

val oos = new java.io.ObjectOutputStream(socket.getOutputStream())
oos.writeObject("hello")

val ois = new java.io.ObjectInputStream(socket.getInputStream())
println("client " + x + "> " + ois.readObject())
ois.close()

oos.close()

socket.close()

The client simply connects to the server, sends hello, and prints the server's response.

The server has been carefully written to mimic the way a thread-limited Web server works. In this case, it allows only a single connection at a time, and must go through the full request-response cycle before it can accept another connection. Of course this severely limits its utility, as it can handle a simultaneous client load of one, and only about four client connections per second.

The bulk of the workload is the 250 ms I/O simulation. It would be nice if we could remove this from the connection handling logic, and offload the processing to some other thread pool. This would close the server loop tightly, allowing it to be ready to handle a subsequent connection almost immediately.

This can be done while maintaining the imperative nature of the loop using continuations.

AsyncServer:

while (true) reset {
  val server = new java.net.ServerSocket(5554, 1)
  val socket = server.accept()
  server.close()

  suspend()

  val ois = new java.io.ObjectInputStream(socket.getInputStream())
  val request = ois.readObject().toString

  Thread.sleep(250)

  val response = "response(" + request + ")"
  val oos = new java.io.ObjectOutputStream(socket.getOutputStream())

  oos.writeObject(response)
  oos.close()

  ois.close()
  socket.close()
}

def suspend() = shift { k: (Unit => Unit) => queue = k :: queue }

This "asyncronous" server uses the suspend method to capture everything after the initial connection and pass it off to queue to be processed by another thread. This slight difference has a major effect; on my computer I can hit my server with thousands of simultaneous connections with few or no connection refusals.

This approach, appropriately scaled up in a Web application, can allow a single server to handle significantly increased simultaneous client loads. The Jetty documentation shows an example which reduces the resource demand by 86%.

Something went wrong with that request. Please try again.