Skip to content
Browse files

file-atom

  • Loading branch information...
1 parent b46ca06 commit 6bf1f58dea70937b9e775912b3556d47cd298f23 @alandipert committed Dec 20, 2011
View
6 plugins/avout-file/.gitignore
@@ -0,0 +1,6 @@
+pom.xml
+*jar
+/lib/
+/classes/
+.lein-failures
+.lein-deps-sum
View
11 plugins/avout-file/README.markdown
@@ -0,0 +1,11 @@
+# avout-file
+
+An example of a file-based, Avout distributed Atom and Ref.
+
+<h3>License</h3>
+
+Avout is distributed under the Eclipse Public License, the same as Clojure.
+
+<h3>Copyright</h3>
+
+Avout is Copyright © 2011 Alan Dipert and Relevance, Inc
View
5 plugins/avout-file/project.clj
@@ -0,0 +1,5 @@
+(defproject avout-file "0.5.3"
+ :description "An example of an Avout-based, file-backed Distributed Atom and Ref"
+ :dependencies [[org.clojure/clojure "1.3.0"]
+ [avout "0.5.3"]]
+ :dev-dependencies [[lein-clojars "0.7.0"]])
View
34 plugins/avout-file/src/avout/atoms/file.clj
@@ -0,0 +1,34 @@
+(ns avout.atoms.file
+ (:use avout.state
+ [clojure.pprint :only (write)]
+ [clojure.java.io :only (make-reader file delete-file)])
+ (:require [avout.atoms :as atoms])
+ (:import (java.io FileOutputStream
+ OutputStreamWriter
+ PushbackReader)))
+
+(defn- read-file
+ [path]
+ (with-open [rdr (PushbackReader. (make-reader path {}))]
+ (read rdr)))
+
+(deftype FileStateContainer [path]
+
+ StateContainer
+
+ (initStateContainer [this]
+ (if (.exists (file path))
+ (read-file path)))
+
+ (destroyStateContainer [this]
+ (delete-file path))
+
+ (getState [this]
+ (read-file path))
+
+ (setState [this new-value]
+ (with-open [fos (FileOutputStream. path)
+ osw (OutputStreamWriter. fos)]
+ (write new-value :stream osw :pretty false)
+ (.flush osw)
+ (.. fos getFD sync))))
View
45 plugins/avout-file/src/avout/file.clj
@@ -0,0 +1,45 @@
+(ns avout.file
+ (:use avout.core
+ [clojure.java.io :only (file)])
+ (:require [avout.refs :as refs]
+ [avout.atoms :as atoms]
+ avout.atoms.file))
+
+(defn file-atom
+ ([zk-client path init-value & {:keys [validator]}]
+ (doto (atoms/distributed-atom zk-client path (avout.atoms.file.FileStateContainer. path))
+ (set-validator! validator)
+ (.reset init-value)))
+ ([zk-client path]
+ (if (.exists (file path))
+ (atoms/distributed-atom zk-client path (avout.atoms.file.FileStateContainer. path))
+ (throw (RuntimeException. "Either provide a path to an existing distributed atom, or provide an intial value")))))
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;; file-atom examples
+(comment
+
+ (use 'avout.atoms)
+ (use 'avout.atoms.file :reload-all)
+ (require '[zookeeper :as zk])
+
+ (def zk-client (zk/connect "127.0.0.1"))
+
+ (def a0 (file-atom zk-client "/tmp/a0" {:a 1}))
+ @a0
+ (swap!! a0 assoc :c 3)
+ @a0
+ (swap!! a0 update-in [:a] inc)
+ @a0
+
+ (def a1 (file-atom zk-client "/tmp/a1" 1 :validator pos?))
+ (add-watch a1 :a1 (fn [key ref old-val new-val]
+ (println key ref old-val new-val)))
+ @a1
+ (swap!! a1 inc)
+ @a1
+ (swap!! a1 - 2)
+ (remove-watch a1 :a1)
+ @a1
+
+)
View
72 plugins/avout-file/test/avout/file/test/atom_write_skew_test.clj
@@ -0,0 +1,72 @@
+(ns avout.file.test.atom-write-skew-test
+ (:use avout.file
+ avout.core
+ clojure.test))
+
+ (defn timer [f]
+ (let [start (System/nanoTime)]
+ (f)
+ (/ (double (- (System/nanoTime) start)) 1000000.0)))
+
+ (defn write-skew-test [client a n]
+ (let [times (for [i (range n)] (promise))]
+ (reset!! a [])
+ (dotimes [i n]
+ (future
+ (try
+ (deliver (nth times i) (timer (fn [] (swap!! a #(conj % (inc (count %)))))))
+ (catch Throwable e (println "atom-write-skew-test deliver ex: ") (.printStackTrace e)))))
+ [times a]))
+
+ (defn proces-results [[time-proms a] n]
+ (let [times (doall (sort (map #(deref % 500000 -1) time-proms)))
+ total-time (last times)
+ time-intervals (map (fn [t0 t1] (- t1 t0)) (conj times 0) times)
+ time-interval-per-thread (map / time-intervals (range n 0 -1))
+ avg-time-interval (/ (apply + time-intervals) n)
+ avg-time-interval-per-thread (/ (apply + time-interval-per-thread) n)]
+ {:times times
+ :total-time total-time
+ :time-intervals time-intervals
+ :time-interval-per-thread time-interval-per-thread
+ :a @a
+ :pass? (= (last @a) (count @a))
+ :skew-count (- (count @a) (last @a))
+ :n n
+ :avg-time-interval avg-time-interval
+ :avg-time-interval-per-thread avg-time-interval-per-thread}))
+
+ (defn analyze-write-skew-test [client a n]
+ (try
+ (let [res (proces-results (write-skew-test client a n) n)]
+ (if (and (= (last (:a res)) n) (:pass? res))
+ res
+ (assoc res :pass? false)))
+ (catch Throwable e (println "analyze-write-skew-test: " e (.printStackTrace e)))))
+
+
+(defn tmp-path []
+ (let [tmp (java.io.File/createTempFile "avout_file_atom" ".clj")
+ tmp-path (.getPath tmp)]
+ (.delete tmp)
+ tmp-path))
+
+(deftest write-skew
+ (let [run-count 100
+ max-threads 25
+ client (connect "127.0.0.1")
+ a (file-atom client (tmp-path) [])
+ test-results (atom [])
+ _ (dotimes [i run-count]
+ (let [threads (inc (rand-int max-threads))
+ res (analyze-write-skew-test client a threads)]
+ (print (str i ": threads " threads ", write-skews " (:skew-count res) ", pass? " (:pass? res)))
+ (println ", values " (:a res))
+ (swap! test-results conj res)))
+ avg-time-interval-per-thread (/ (apply + (map :avg-time-interval-per-thread @test-results)) (count @test-results))]
+ (println (str "total writes: " (reduce + (map :n @test-results))))
+ (println (str "total write skews: " (reduce + (map :skew-count @test-results))))
+ (println (str "probability of write skew: " (* 1.0 (/ (reduce + (map :skew-count @test-results))
+ (reduce + (map :n @test-results))))))
+ (println (str "avg-time-interval-per-thread: " avg-time-interval-per-thread " msec"))
+ (is (reduce #(and %1 %2) (map :pass? @test-results)))))

0 comments on commit 6bf1f58

Please sign in to comment.
Something went wrong with that request. Please try again.