Skip to content
Browse files

bitmap api

  • Loading branch information...
1 parent 7946ecc commit c263b4b99b7959cb90beebb6dfdc5f155c0c1bad @huahaiy committed Jul 12, 2012
Showing with 257 additions and 0 deletions.
  1. +101 −0 src/colap/bitmap.clj
  2. +50 −0 src/colap/bytebuffer.clj
  3. +33 −0 src/colap/cassandra.clj
  4. +73 −0 test/colap/test/bitmap.clj
View
101 src/colap/bitmap.clj
@@ -0,0 +1,101 @@
+(ns colap.bitmap
+ {:doc "Operations on a bitmap, which is implemented by an
+ EWAHCompressedBitmap"
+ :author "Huahai Yang"}
+ (:use [slingshot.slingshot :only [throw+]])
+ (:import [javaewah EWAHCompressedBitmap]
+ [colap.bytebuffer OutputStream InputStream]
+ [java.nio ByteBuffer]
+ [java.io DataOutputStream DataInputStream]))
+
+(defn bitmap->positions
+ "Return a seq of the positions of 1 in the bitmap"
+ [^EWAHCompressedBitmap bm]
+ (iterator-seq (.iterator bm)))
+
+(defn append-position!
+ "Set the corresponding positions of the given bitmap according to the
+ given integers, which should be greater than the existing positions,
+ unique and in ascending order."
+ ([^EWAHCompressedBitmap bm p]
+ (if (< p (.sizeInBits bm))
+ (throw+ "Can only append to the end of a bitmap")
+ (.set bm p))
+ bm)
+ ([^EWAHCompressedBitmap bm p & ps]
+ (doseq [x (cons p ps)] (append-position! bm x))
+ bm))
+
+(defn positions->bitmap
+ "Load a collection of unordered 1 positions into a new bitmap"
+ [coll]
+ (apply append-position! (EWAHCompressedBitmap.) (apply sorted-set coll)))
+
+(defn- insert-to-list
+ [^java.util.List l p]
+ (let [i (java.util.Collections/binarySearch l p)]
+ (when (< i 0)
+ (.add l (- (inc i)) p))
+ l))
+
+(defn- remove-from-list
+ [^java.util.List l p]
+ (let [i (java.util.Collections/binarySearch l p)]
+ (when (>= i 0)
+ (.remove l i))
+ l))
+
+(defn- do-at-positions
+ [^EWAHCompressedBitmap bm f coll]
+ (let [l (.getPositions bm)]
+ (doseq [p coll] (f l (int p)))
+ (apply append-position! (EWAHCompressedBitmap.) (seq l))))
+
+(defn insert-position
+ "Return a new bitmap with given positions added"
+ ([^EWAHCompressedBitmap bm p]
+ (.or bm (append-position! (EWAHCompressedBitmap.) p)))
+ ([^EWAHCompressedBitmap bm p & ps]
+ (let [coll (cons p ps) n (count coll)]
+ (if (< n 22500)
+ (do-at-positions bm insert-to-list coll)
+ (.or bm (positions->bitmap coll))))))
+
+(defn remove-position
+ "Return a new bitmap with given positions removed"
+ ([^EWAHCompressedBitmap bm p]
+ (apply append-position! (EWAHCompressedBitmap.)
+ (seq (remove-from-list (.getPositions bm) (int p)))))
+ ([^EWAHCompressedBitmap bm p & ps]
+ (do-at-positions bm remove-from-list (cons p ps))))
+
+(defn- in-list?
+ [^java.util.List l p]
+ (>= (java.util.Collections/binarySearch l (int p)) 0))
+
+(defn position-set?
+ "Return true if the given positions of the bitmp are set"
+ ([^EWAHCompressedBitmap bm p]
+ (in-list? (.getPositions bm) p))
+ ([^EWAHCompressedBitmap bm p & ps]
+ (let [l (.getPositions bm)]
+ (every? #(in-list? l %) (cons p ps)))))
+
+(defn bitmap->bytebuffer
+ "Return a Bytebuffer filled with the given bitmap"
+ [^EWAHCompressedBitmap bm]
+ (let [bb (ByteBuffer/allocate (.serializedSizeInBytes bm))]
+ (.serialize bm (-> bb
+ (OutputStream.)
+ (DataOutputStream.)))
+ (.rewind bb)))
+
+(defn bytebuffer->bitmap
+ "Recover a bitmap from the given ByteBuffer"
+ [bb]
+ (let [bm (EWAHCompressedBitmap.)]
+ (.deserialize bm (-> bb
+ (InputStream.)
+ (DataInputStream.)))
+ bm))
+
View
50 src/colap/bytebuffer.clj
@@ -0,0 +1,50 @@
+(ns colap.bytebuffer
+ {:doc "Define IO streams that are directly backed by a given
+ java.nio.ByteBuffer. This nanmespace needs to be AOT compiled."
+ :author "Huahai Yang"})
+
+(gen-class
+ :name colap.bytebuffer.OutputStream
+ :extends java.io.OutputStream
+ :init init
+ :state state
+ :constructors {[java.nio.ByteBuffer] []}
+ :exposes-methods {write writeSuper}
+ :main false)
+
+(defn -init
+ "Initialize the stream with a given ByteBuffer"
+ [bb]
+ [[] bb])
+
+(defn -write
+ "Implements the write methods of java.io.OutputStream"
+ ([^colap.bytebuffer.OutputStream this b]
+ (if (= (type b) (Class/forName "[B"))
+ (.writeSuper this ^bytes b)
+ (.put ^java.nio.ByteBuffer (.state this)
+ (clojure.lang.RT/uncheckedByteCast ^int b))))
+ ([^colap.bytebuffer.OutputStream this ^bytes b o l]
+ (.writeSuper this b ^int o ^int l)))
+
+(gen-class
+ :name colap.bytebuffer.InputStream
+ :extends java.io.InputStream
+ :init init
+ :state state
+ :constructors {[java.nio.ByteBuffer] []}
+ :exposes-methods {read readSuper}
+ :main false)
+
+(defn -read
+ "Implements the read methods of java.io.InputStream"
+ ([^colap.bytebuffer.InputStream this]
+ (let [bb ^java.nio.ByteBuffer (.state this)]
+ (if (.hasRemaining bb)
+ (clojure.lang.RT/uncheckedIntCast (bit-and 0xff ^byte (.get bb)))
+ (int -1))))
+ ([^colap.bytebuffer.InputStream this ^bytes b]
+ (.readSuper this b))
+ ([^colap.bytebuffer.InputStream this ^bytes b o l]
+ (.readSuper this b ^int o ^int l)))
+
View
33 src/colap/cassandra.clj
@@ -0,0 +1,33 @@
+(ns colap.cassandra
+ {:doc "Use Cassandra as data store"
+ :author "Huahai Yang"}
+ (:use [clj-hector.ddl]
+ [clj-hector.core]
+ [colap.bitmap])
+ (:import [me.prettyprint.hector.api Serializer]))
+
+(def ^:dynamic *keyspace* (keyspace (cluster "Test Cluster" "localhost") "testks"))
+(def ^:dynamic *cf-name* "User")
+
+(defn bm-serializer
+ "return an instance of hector Serializer that can serialize/deserialize
+ an EWAHCompresedBitmap to/from ByteBuffer, "
+ [bm]
+ (proxy [Serializer] []
+ (toByteBuffer [bm]
+ (bitmap->bytebuffer bm))
+ (fromByteBuffer [bb]
+ (bytebuffer->bitmap bb))))
+
+(defn store-bitmap
+ "store a EWAHCompresedBitmap as a column value"
+ [dim val bm]
+ (put *keyspace* *cf-name* dim {val bm}
+ :n-serializer :string :v-serializer bm-serializer))
+
+(defn retrieve-bitmap
+ "retrieve a EWAHCompresedBitmap as a column value"
+ [dim val]
+ (-> (get-columns *keyspace* *cf-name* dim val
+ :n-serializer :string :v-serializer bm-serializer)
+ (get val)))
View
73 test/colap/test/bitmap.clj
@@ -0,0 +1,73 @@
+(ns colap.test.bitmap
+ {:author "Huahai Yang"}
+ (:use [colap.bitmap]
+ [clojure.test]
+ [clojure.java.io])
+ (:import [javaewah EWAHCompressedBitmap]))
+
+(def ^:private max-pos 2147483583) ;the maximum settable bit (Integer.MAX_VALUE - 64)
+(def ^:private ext-bm (apply append-position! (EWAHCompressedBitmap.) [0 max-pos]))
+(def ^:private rnd-bm (EWAHCompressedBitmap.))
+
+(declare ^:private ^:dynamic rnd-coll)
+
+(defn rnd-ints
+ [n limit]
+ (repeatedly n #(rand-int limit)))
+
+(defn rnd-bitmaps-fixture
+ [f]
+ (binding [rnd-coll (rnd-ints 100 max-pos)]
+ (try
+ (apply append-position! rnd-bm (apply sorted-set rnd-coll))
+ (f)
+ (finally
+ (.clear rnd-bm)))))
+
+(use-fixtures :each rnd-bitmaps-fixture)
+
+(deftest from-to-positions
+ (is (= (bitmap->positions ext-bm) [0 max-pos]))
+ (is (= (bitmap->positions rnd-bm) (seq (apply sorted-set rnd-coll))))
+ (is (= ext-bm (positions->bitmap (bitmap->positions ext-bm))))
+ (is (= rnd-bm (positions->bitmap (bitmap->positions rnd-bm)))))
+
+(deftest insert-remove
+ (is (= (bitmap->positions (insert-position ext-bm 1)) [0 1 max-pos]))
+ (is (= (insert-position ext-bm 3 2 729) (positions->bitmap [0 2 3 729 max-pos])))
+ (let [s (set (bitmap->positions rnd-bm))
+ xs (filter #(nil? (s %)) (range))
+ x (first xs)
+ nf (take 5 xs)]
+ (is (= rnd-bm (remove-position (insert-position rnd-bm x) x)))
+ (is (= rnd-bm (apply remove-position (apply insert-position rnd-bm nf) nf)))))
+
+(deftest set-or-not
+ (is (position-set? ext-bm max-pos))
+ (is (position-set? ext-bm 0 max-pos))
+ (is (position-set? (insert-position rnd-bm 289) 289))
+ (is (position-set? (insert-position rnd-bm 3 899) 899 3)))
+
+(deftest from-to-bytebuffer
+ (is (= ext-bm (bytebuffer->bitmap (bitmap->bytebuffer ext-bm)))
+ "Error serializing bitmap to bytebuffer")
+ (is (= rnd-bm (bytebuffer->bitmap (bitmap->bytebuffer rnd-bm)))
+ "Error serializing bitmap to bytebuffer"))
+
+(defn benchmark
+ [n i f x]
+ (str i ","
+ (second (first
+ (re-seq #"[^\d]+(\d+\.\d+).+"
+ (with-out-str
+ (time (dotimes [_ n] (f x)))))))
+ "\n"))
+
+(defn run-benchmark
+ [f o]
+ (with-open [w (writer o :append true)]
+ (doseq [x (rnd-ints 1000 40000)]
+ (.write w (benchmark 1 x f (rnd-ints x max-pos))))
+ ;(doseq [x (rnd-ints 1000 max-pos)]
+ ;(.write w (benchmark 100 x f [x])))
+ (.flush w)))

0 comments on commit c263b4b

Please sign in to comment.
Something went wrong with that request. Please try again.