Permalink
Browse files

Make historical reads faster (hopefully).

The codec_length and revision fields in protobufs must now
be repeated, and protobuf formats ill do historical reads
by reading the whole thing non-historically, then looking at
the length notations to see how much to read, then doing
another full read of that "chunk".
  • Loading branch information...
1 parent 13e27f1 commit aa3c9ed8f3dcabce5dcab1bb65ca543d9cac3626 @amalloy amalloy committed May 24, 2012
Showing with 69 additions and 15 deletions.
  1. +1 −1 proto/jiraph/incoming_only.proto
  2. +1 −1 proto/jiraph/test.proto
  3. +67 −13 src/jiraph/formats/protobuf.clj
@@ -5,7 +5,7 @@ option java_outer_classname = "Meta";
message Node {
repeated Edge in = 1 [(set) = true];
- optional fixed32 proto_length = 14;
+ repeated fixed32 proto_length = 14;
repeated int64 revisions = 15;
}
View
@@ -16,7 +16,7 @@ message Foo {
optional int32 rev = 8;
- optional fixed32 proto_length = 13;
+ repeated fixed32 proto_length = 13;
repeated int64 revisions = 14;
optional bool codec_reset = 15;
}
@@ -1,27 +1,81 @@
(ns jiraph.formats.protobuf
- (:use [jiraph.formats :only [revisioned-format tidy-node]]
+ (:use [jiraph.formats :only [revisioned-format tidy-node add-revisioning-modes]]
+ [jiraph.codex :only [encode decode] :as codex]
[useful.utils :only [adjoin]]
- [useful.map :only [keyed]])
+ [useful.map :only [keyed update update-each]]
+ [io.core :only [catbytes]]
+ [protobuf.core :only [protodef protobuf-dump]])
(:require [gloss.core :as gloss]
[protobuf.codec :as protobuf]))
+(def ^:private ^:const len-key :proto_length)
+
+(defn- wrap-tidying [f]
+ (fn [opts]
+ (update-each (f opts) [:codec] codex/wrap identity tidy-node)))
+
+(defn- wrap-revisioning-modes [f]
+ (fn [opts]
+ (add-revisioning-modes (f opts))))
+
+(defn- num-bytes-to-encode-length [proto]
+ (let [proto (protodef proto)
+ min (alength (protobuf-dump proto {len-key 0}))
+ max (alength (protobuf-dump proto {len-key Integer/MAX_VALUE}))]
+ (letfn [(check [test msg]
+ (when-not test
+ (throw (Exception. (format "In %s: %s %s"
+ (.getFullName proto) (name len-key) msg)))))]
+ (check (pos? min)
+ "field is required for repeated protobufs")
+ (check (= min max)
+ "must be of type fixed32 or fixed64")
+ max)))
+
;; NB doesn't currently work if you do a full/optimized read with _reset keys.
;; plan is to fall back to non-optimized reads in that case, but support an
;; option to protobuf-codec to never try an optimized read if you expect _resets
+
+(defn- length-for-revision [node goal-revision header-len]
+ (loop [target-len 0,
+ [[rev len] :as pairs] (map vector
+ (:revisions node)
+ (len-key node))]
+ (if (or (not pairs)
+ (> rev goal-revision))
+ target-len
+ (recur (+ len target-len header-len)
+ (next pairs)))))
+
+;; TODO temporarily threw away code for handling non-adjoin reduce-fn
(defn protobuf-format
([proto]
(protobuf-format proto adjoin))
([proto reduce-fn]
+ (when-not (= reduce-fn adjoin)
+ (throw (IllegalArgumentException. (format "Unsupported reduce-fn %s" reduce-fn))))
(let [schema (protobuf/codec-schema proto)
- codec (protobuf/protobuf-codec proto, :repeated true)
+ codec (protobuf/protobuf-codec proto)
proto-format (keyed [codec schema reduce-fn])
- revisioned (revisioned-format (constantly proto-format))]
- (if (= adjoin reduce-fn)
- (let [base-codec (protobuf/protobuf-codec proto)
- codec (gloss/compile-frame base-codec identity tidy-node)
- full-format (keyed [codec schema reduce-fn])]
- (fn [{:keys [revision] :as opts}]
- (if (nil? revision)
- full-format
- (revisioned opts))))
- revisioned))))
+ header-len (num-bytes-to-encode-length proto)]
+ (wrap-revisioning-modes
+ (wrap-tidying
+ (fn [{:keys [revision] :as opts}]
+ (if (nil? revision)
+ proto-format
+ (update proto-format :codec
+ (fn [codec]
+ {:read (fn [bytes]
+ (let [full-node (decode codec bytes)
+ goal-length (length-for-revision full-node
+ revision header-len)]
+ (if (= goal-length (count bytes))
+ full-node
+ (let [read-target (byte-array goal-length)]
+ (System/arraycopy bytes 0 read-target 0 goal-length)
+ (decode codec read-target)))))
+ :write (fn [node]
+ (let [node (assoc node :revisions revision)
+ encoded (encode codec node)]
+ (catbytes (encode codec {len-key (count encoded)})
+ encoded)))})))))))))

0 comments on commit aa3c9ed

Please sign in to comment.