Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Mar 24, 2012
0 parents commit ad4f9f7
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 0 deletions.
1 change: 1 addition & 0 deletions .dir-locals.el
@@ -0,0 +1 @@
((clojure-mode . ((clojure-swank-command . "lein2 jack-in %s"))))
10 changes: 10 additions & 0 deletions .gitignore
@@ -0,0 +1,10 @@
/target
/lib
/classes
/checkouts
pom.xml
*.jar
*.class
.lein-deps-sum
.lein-failures
.lein-plugins
3 changes: 3 additions & 0 deletions .travis.yml
@@ -0,0 +1,3 @@
language: clojure
lein: lein2
script: lein2 all test
73 changes: 73 additions & 0 deletions README.md
@@ -0,0 +1,73 @@
# Romulan, LMAX Disruptor in Clojure Embrace

ClojureWerkz Romulan is a Clojure DSL for [LMAX Disruptor](http://code.google.com/p/disruptor/).


## Documentation & Examples

ClojureWerkz Romulan is a very young project and until 1.0 is released and documentation guides are written,
it may be challenging to use for anyone except the author. For code examples, see our test
suite.

Once the library matures, we will update this document.

## Community

Once APIs stabilize, we will start a mailing list.

To subscribe for announcements of releases, important changes and so on, please follow [@ClojureWerkz](https://twitter.com/#!/clojurewerkz) on Twitter.


## This is a Work In Progress

This is a young project that is still very much a work in progress.



## Maven Artifacts

### Snapshots

If you are comfortable with using snapshots, snapshot artifacts are [released to Clojars](https://clojars.org/clojurewerkz/romulan) every few days.

With Leiningen:

[clojurewerkz/romulan "0.1.0-SNAPSHOT"]


With Maven:

<dependency>
<groupId>clojurewerkz</groupId>
<artifactId>romulan</artifactId>
<version>0.1.0-SNAPSHOT</version>
</dependency>


## Supported Clojure versions

Romulan is built from the ground up for Clojure 1.3 and up.


## Continuous Integration

[![Continuous Integration status](https://secure.travis-ci.org/clojurewerkz/romulan.png)](http://travis-ci.org/clojurewerkz/romulan)

CI is hosted by [travis-ci.org](http://travis-ci.org)


## Development

Romulan uses [Leiningen 2](https://github.com/technomancy/leiningen/blob/master/doc/TUTORIAL.md). Make sure you have it installed and then run tests against Clojure 1.3.0 and 1.4.0[-beta5] using

lein2 all test

Then create a branch and make your changes on it. Once you are done with your changes and all tests pass, submit
a pull request on Github.


## License

Copyright © 2012 Michael S. Klishin, Alex Petrov

Distributed under the Eclipse Public License, the same as Clojure.
17 changes: 17 additions & 0 deletions project.clj
@@ -0,0 +1,17 @@
(defproject clojurewerkz/romulan "0.1.0-SNAPSHOT"
:description "LMAX Disruptor in Clojure embrace"
:min-lein-version "2.0.0"
:url "http://github.com/clojurewerkz/romulan"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.3.0"]
[com.googlecode.disruptor/disruptor "2.8"]]
:profiles {:1.4 {:dependencies [[org.clojure/clojure "1.4.0-beta5"]]}}
:aliases { "all" ["with-profile" "dev:dev,1.4"] }
:source-paths ["src/clojure"]
:java-source-paths ["src/java"]
:repositories {"clojure-releases" "http://build.clojure.org/releases",
"sonatype" {:url "http://oss.sonatype.org/content/repositories/releases",
:snapshots false,
:releases {:checksum :fail, :update :always}}}
:warn-on-reflection true)
94 changes: 94 additions & 0 deletions src/clojure/clojurewerkz/romulan/dsl.clj
@@ -0,0 +1,94 @@
(ns clojurewerkz.romulan.dsl
(:import [java.util.concurrent Executors ExecutorService]
[com.lmax.disruptor EventFactory EventTranslator EventHandler ClaimStrategy WaitStrategy]
[com.lmax.disruptor.dsl Disruptor]))

;;
;; Implementation
;;

(defn- fn->event-factory
"Creates a new Disruptor EventFactory that uses provided function"
[f]
(reify com.lmax.disruptor.EventFactory
(newInstance [this]
(f))))

(defn- fn->event-handler
"Creates a new Disruptor EventHandler that uses provided function"
[f]
(reify com.lmax.disruptor.EventHandler
(onEvent [this event sequence end-of-batch?]
(f event sequence end-of-batch?))))

(defn- fn->event-translator
"Creates a new Disruptor EventTranslator that uses provided function"
[f]
(reify com.lmax.disruptor.EventTranslator
(translateTo [this event sequence]
(f event sequence))))



;;
;; API
;;

(defprotocol EFFactory
(^com.lmax.disruptor.EventFactory event-factory [arg] "Creates Disruptor EventFactory from arg"))

(extend-protocol EFFactory
EventFactory
(event-factory [^EventFactory arg]
arg)

clojure.lang.IFn
(event-factory [^clojure.lang.IFn arg]
(fn->event-factory arg)))


(defprotocol EventHandlerFactory
(^com.lmax.disruptor.EventHandler event-handler [arg] "Creates an EventHandler from arg"))

(extend-protocol EventHandlerFactory
EventHandler
(event-handler [arg]
arg)

clojure.lang.IFn
(event-handler [^clojure.lang.IFn arg]
(fn->event-handler arg)))


(defprotocol EventTranslatorFactory
(^com.lmax.disruptor.EventTranslator event-translator [arg] "Creates an EventTranslator from arg"))

(extend-protocol EventTranslatorFactory
EventTranslator
(event-translator [arg]
arg)

clojure.lang.IFn
(event-translator [^clojure.lang.IFn arg]
(fn->event-translator arg)))




(defn ^com.lmax.disruptor.ExceptionHandler exception-handler
"Instantiates a new Disruptor exception handler that uses provided functions (:on-event, :on-start, :on-shutdown)"
[&{ :keys [on-event on-start on-shutdown] }]
(reify com.lmax.disruptor.ExceptionHandler
(handleEventException [this ex sequence event]
(on-event ex sequence event))
(handleOnStartException [this ex]
(on-start ex))
(handleOnShutdownException [this ex]
(on-shutdown ex))))


(defn ^com.lmax.disruptor.dsl.Disruptor disruptor
([ef ^long ring-buffer-size ^ExecutorService executor]
(Disruptor. (event-factory ef) ring-buffer-size executor))
([ef ^ExecutorService executor ^ClaimStrategy cs ^WaitStrategy ws]
(Disruptor. (event-factory ef) executor cs ws)))
Empty file added src/java/.gitkeep
Empty file.
75 changes: 75 additions & 0 deletions test/clojurewerkz/romulan/dsl_test.clj
@@ -0,0 +1,75 @@
(ns clojurewerkz.romulan.dsl-test
(:use clojure.test clojurewerkz.romulan.dsl)
(:import [java.util.concurrent Executors ExecutorService CountDownLatch]
[com.lmax.disruptor EventHandler EventTranslator ExceptionHandler SingleThreadedClaimStrategy BlockingWaitStrategy]
[com.lmax.disruptor.dsl Disruptor]))

(deftest test-event-factory
(let [f (constantly 42)
ef (event-factory f)]
(is (instance? com.lmax.disruptor.EventFactory ef))
(is (= 42 (.newInstance ef)))))

(deftest test-handler-factory
(let [state (atom {})
fn (fn [event sequence end-of-batch?]
(swap! state assoc :event event :sequence sequence :eob end-of-batch?))
eh (event-handler fn)]
(is (instance? EventHandler eh))
(.onEvent eh { :type "dummy" } 50 true)
(is (= "dummy" (get-in @state [:event :type])))
(is (= 50 (:sequence @state)))
(is (:eob @state))))

(deftest test-event-translator
(let [et (event-translator (fn [x _] x))]
(is (instance? com.lmax.disruptor.EventTranslator et))
(are [input output] (is (= output (.translateTo et input 0)))
1 1
"2" "2"
3.0 3.0
4/5 4/5
'five 'five
:six :six)))

(deftest test-exception-handler-factory
(letfn [(event-ehf [ex l evt])
(startup-ehf [ex])
(shutdown-ehf [ex])]
(is (instance? ExceptionHandler (exception-handler :on-event event-ehf
:on-start startup-ehf
:on-shutdown shutdown-ehf)))))

(deftest test-basic-dsl-example1
(let [^Disruptor d (disruptor (event-factory (constantly 99))
(Executors/newFixedThreadPool 2)
(SingleThreadedClaimStrategy. 4)
(BlockingWaitStrategy.))]
(.start d)
(.shutdown d)))

(deftest test-basic-dsl-example2
(let [^Disruptor d (disruptor (constantly 99)
256
(Executors/newFixedThreadPool 2))]
(.start d)
(.shutdown d)))


(deftest test-publishing-and-handling-events-using-java-interop-example1
(let [n 87
latch (CountDownLatch. n)
eh (event-handler (fn [event sequence end-of-batch?]
))
et (event-translator (fn [x _]
(.countDown latch)
x))
^Disruptor d (disruptor (constantly 99)
256
(Executors/newFixedThreadPool 2))]
(.handleEventsWith d (into-array EventHandler [eh]))
(.start d)
(dotimes [i n]
(.publishEvent d et))
(.await latch)
(.shutdown d)))

0 comments on commit ad4f9f7

Please sign in to comment.