Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
add build.sbt for simplify usage in multi-modules sbt project
  • Loading branch information
kulikov committed Feb 1, 2013
1 parent d6b59c9 commit 213f3b7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,3 +1,4 @@
.idea*
*.log
*.iml
target/
17 changes: 17 additions & 0 deletions build.sbt
@@ -0,0 +1,17 @@
organization := "org.eligosource"

name := "eventsourced"

version := "0.5-SNAPSHOT"

scalaVersion := "2.10.0"

resolvers += "Journalio Repo" at "http://repo.eligotech.com/nexus/content/repositories/eligosource-releases"

libraryDependencies ++= Seq(
"com.google.protobuf" % "protobuf-java" % "2.4.1" % "compile",
"com.typesafe.akka" %% "akka-actor" % "2.1.0" % "compile",
"commons-io" % "commons-io" % "2.3" % "compile",
"journalio" % "journalio" % "1.2" % "compile",
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.4.1" % "compile"
)
Expand Up @@ -16,7 +16,6 @@
package org.eligosource.eventsourced.core

import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeoutException

import scala.annotation.tailrec
import scala.concurrent._
Expand Down Expand Up @@ -256,28 +255,28 @@ class EventsourcingExtension(system: ExtendedActorSystem) extends Extension {
}

@tailrec
private def registerChannel(channelId: Int, channelName: Option[String], channel: ActorRef): Unit = {
private def registerChannel(channelId: Int, channelName: Option[String], channel: ActorRef) {
val current = channelsRef.get()
val updated = if (channelName.isDefined) current.add(channelId, channelName.get, channel) else current.add(channelId, channel)
if (!channelsRef.compareAndSet(current, updated)) registerChannel(channelId, channelName, channel)
}

@tailrec
private def registerProcessor(processorId: Int, processor: ActorRef): Unit = {
private def registerProcessor(processorId: Int, processor: ActorRef) {
val current = processorsRef.get()
val updated = current + (processorId -> processor)
if (!processorsRef.compareAndSet(current, updated)) registerProcessor(processorId, processor)
}

@tailrec
private [core] final def deregisterChannel(channelId: Int): Unit = {
private [core] final def deregisterChannel(channelId: Int) {
val current = channelsRef.get()
val updated = current.remove(channelId)
if (!channelsRef.compareAndSet(current, updated)) deregisterChannel(channelId)
}

@tailrec
private [core] final def deregisterProcessor(processorId: Int): Unit = {
private [core] final def deregisterProcessor(processorId: Int) {
val current = processorsRef.get()
val updated = current - processorId
if (!processorsRef.compareAndSet(current, updated)) deregisterProcessor(processorId)
Expand Down
Expand Up @@ -64,14 +64,14 @@ private [eventsourced] class InmemJournal extends Journal {

def storedCounter = counter

private def replay(processorId: Int, channelId: Int, fromSequenceNr: Long, p: Message => Unit): Unit = {
private def replay(processorId: Int, channelId: Int, fromSequenceNr: Long, p: Message => Unit) {
val startKey = Key(processorId, channelId, fromSequenceNr, 0)
val iter = redoMap.from(startKey).iterator.buffered
replay(iter, startKey, p)
}

@scala.annotation.tailrec
private def replay(iter: BufferedIterator[(Key, Any)], key: Key, p: Message => Unit): Unit = {
private def replay(iter: BufferedIterator[(Key, Any)], key: Key, p: Message => Unit) {
if (iter.hasNext) {
val nextEntry = iter.next()
val nextKey = nextEntry._1
Expand Down

0 comments on commit 213f3b7

Please sign in to comment.