Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 3 commits
  • 6 files changed
  • 0 commit comments
  • 1 contributor
Commits on Mar 12, 2012
Dietrich Featherston d2fn schedule tasks sent a fiber to be run sequentially in a separate thre…
…ad - supports re-entrant actor behavior for scalang services
26a18ad
Dietrich Featherston d2fn add timer metric for term encoding 1854ff0
Commits on Mar 13, 2012
Dietrich Featherston d2fn better comments and reorganized code a bit 799f10c
2  pom.xml
View
@@ -5,7 +5,7 @@
<name>scalang</name>
<artifactId>scalang-scala_2.9.1</artifactId>
<packaging>jar</packaging>
- <version>0.11</version>
+ <version>0.12</version>
<properties>
<scala.version>2.9.1</scala.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4 src/main/scala/scalang/node/ErlangNodeClient.scala
View
@@ -52,7 +52,7 @@ class ErlangNodeClient(
pipeline.addLast("erlangFramer", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4))
pipeline.addLast("encoderFramer", new LengthFieldPrepender(4))
pipeline.addLast("erlangDecoder", new ScalaTermDecoder(peer, typeFactory))
- pipeline.addLast("erlangEncoder", new ScalaTermEncoder)
+ pipeline.addLast("erlangEncoder", new ScalaTermEncoder(peer))
pipeline.addLast("erlangHandler", new ErlangHandler(node, afterHandshake))
pipeline
@@ -72,4 +72,4 @@ class ErlangNodeClient(
}
}
})
-}
+}
4 src/main/scala/scalang/node/ErlangNodeServer.scala
View
@@ -41,7 +41,7 @@ class ErlangNodeServer(node : ErlangNode, typeFactory : TypeFactory) {
pipeline.addLast("erlangFramer", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4))
pipeline.addLast("encoderFramer", new LengthFieldPrepender(4))
pipeline.addLast("erlangDecoder", new ScalaTermDecoder('server, typeFactory))
- pipeline.addLast("erlangEncoder", new ScalaTermEncoder)
+ pipeline.addLast("erlangEncoder", new ScalaTermEncoder('server))
pipeline.addLast("erlangHandler", new ErlangHandler(node))
pipeline
@@ -50,4 +50,4 @@ class ErlangNodeServer(node : ErlangNode, typeFactory : TypeFactory) {
val channel = bootstrap.bind(new InetSocketAddress(0))
def port = channel.getLocalAddress.asInstanceOf[InetSocketAddress].getPort
-}
+}
57 src/main/scala/scalang/node/ScalaTermEncoder.scala
View
@@ -28,34 +28,39 @@ import java.util.{List => JList}
import scalang.util.ByteArray
import scalang.util.CamelToUnder._
import com.codahale.logula.Logging
+import com.yammer.metrics.scala._
+
+class ScalaTermEncoder(peer: Symbol) extends OneToOneEncoder with Logging with Instrumented {
+
+ val encodeTimer = metrics.timer("encoding", peer.name)
-class ScalaTermEncoder extends OneToOneEncoder with Logging {
-
override def encode(ctx : ChannelHandlerContext, channel : Channel, obj : Any) : Object = {
log.debug("sending msg %s", obj)
- val buffer = ChannelBuffers.dynamicBuffer(512)
- //write distribution header
- buffer.writeBytes(ByteArray(112,131))
- obj match {
- case Tock =>
- buffer.clear
- case LinkMessage(from, to) =>
- encodeObject(buffer, (1, from, to))
- case SendMessage(to, msg) =>
- encodeObject(buffer, (2, Symbol(""), to))
- buffer.writeByte(131)
- encodeObject(buffer, msg)
- case ExitMessage(from, to, reason) =>
- encodeObject(buffer, (3, from, to, reason))
- case UnlinkMessage(from, to) =>
- encodeObject(buffer, (4, from, to))
- case RegSend(from, to, msg) =>
- encodeObject(buffer, (6, from, Symbol(""), to))
- buffer.writeByte(131)
- encodeObject(buffer, msg)
+ encodeTimer.time {
+ val buffer = ChannelBuffers.dynamicBuffer(512)
+ //write distribution header
+ buffer.writeBytes(ByteArray(112,131))
+ obj match {
+ case Tock =>
+ buffer.clear()
+ case LinkMessage(from, to) =>
+ encodeObject(buffer, (1, from, to))
+ case SendMessage(to, msg) =>
+ encodeObject(buffer, (2, Symbol(""), to))
+ buffer.writeByte(131)
+ encodeObject(buffer, msg)
+ case ExitMessage(from, to, reason) =>
+ encodeObject(buffer, (3, from, to, reason))
+ case UnlinkMessage(from, to) =>
+ encodeObject(buffer, (4, from, to))
+ case RegSend(from, to, msg) =>
+ encodeObject(buffer, (6, from, Symbol(""), to))
+ buffer.writeByte(131)
+ encodeObject(buffer, msg)
+ }
+
+ buffer
}
-
- buffer
}
def encodeObject(buffer : ChannelBuffer, obj : Any) : Unit = obj match {
@@ -109,10 +114,10 @@ class ScalaTermEncoder extends OneToOneEncoder with Logging {
writeList(buffer, list, tail)
case Nil =>
buffer.writeByte(106)
- case l : List[Any] =>
- writeList(buffer, l, Nil)
case l : JList[Any] =>
writeJList(buffer, l, Nil)
+ case l : List[Any] =>
+ writeList(buffer, l, Nil)
case b : BigInteger =>
writeBigInt(buffer, b)
case a : Array[Byte] =>
28 src/main/scala/scalang/util/BatchPoolExecutor.scala
View
@@ -15,16 +15,28 @@ class BatchPoolExecutor(path : String,
factory : ThreadFactory) extends
InstrumentedThreadPoolExecutor(path, name, coreSize, maxSize, keepAlive, unit, queue, factory) with
BatchExecutor {
-
+
override def execute(reader : EventReader) {
- execute(new Runnable {
- def run {
- var i = 0
- while (i < reader.size) {
- reader.get(i).run
- i += 1
+
+ // build a job which will run available work sequentially in a separate thread
+
+ val tasks = new ArrayList[Runnable](reader.size())
+ var i = 0
+ while(i < reader.size()) {
+ tasks.add(reader.get(i))
+ i = i + 1
+ }
+
+ val job =
+ new Runnable {
+ def run() {
+ val ti = tasks.iterator()
+ while(ti.hasNext) {
+ ti.next().run()
+ }
}
}
- })
+
+ execute(job)
}
}
4 src/main/scala/scalang/util/ThreadPoolFactory.scala
View
@@ -70,11 +70,11 @@ class DefaultThreadPoolFactory extends ThreadPoolFactory {
queue.executor = pool
pool
} else {
- return batchExecutor
+ batchExecutor
}
}
def createBatchExecutor(reentrant : Boolean) : BatchExecutor = {
createBatchExecutor("pool-" + poolNameCounter.getAndIncrement, reentrant)
}
-}
+}

No commit comments for this range

Something went wrong with that request. Please try again.