Skip to content

Commit

Permalink
Improving performance. Adding partial decoding for records.
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyczek committed Jul 27, 2011
1 parent d33598e commit 90d2d20
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 57 deletions.
150 changes: 94 additions & 56 deletions src/simple_avro/core.clj
Expand Up @@ -33,26 +33,26 @@

(def #^{:private true}
*packers*
{Schema$Type/NULL (fn [#^Schema schema obj] nil)
Schema$Type/BOOLEAN (fn [#^Schema schema obj] (boolean obj))
Schema$Type/INT (fn [#^Schema schema obj] (int obj))
Schema$Type/LONG (fn [#^Schema schema obj] (long obj))
Schema$Type/FLOAT (fn [#^Schema schema obj] (float obj))
Schema$Type/DOUBLE (fn [#^Schema schema obj] (double obj))
Schema$Type/BYTES (fn [#^Schema schema obj] (bytes obj))

Schema$Type/STRING (fn [#^Schema schema obj] (if (string? obj)
{Schema$Type/NULL (fn pack-nil [#^Schema schema obj] nil)
Schema$Type/BOOLEAN (fn pack-boolean [#^Schema schema obj] (boolean obj))
Schema$Type/INT (fn pack-int [#^Schema schema obj] (int obj))
Schema$Type/LONG (fn pack-long [#^Schema schema obj] (long obj))
Schema$Type/FLOAT (fn pack-float [#^Schema schema obj] (float obj))
Schema$Type/DOUBLE (fn pack-double [#^Schema schema obj] (double obj))
Schema$Type/BYTES (fn pack-bytes [#^Schema schema obj] (bytes obj))

Schema$Type/STRING (fn pack-string [#^Schema schema obj] (if (string? obj)
(Utf8. (str obj))
(throw (Exception. (str "'" obj "' is not a string.")))))
Schema$Type/FIXED (fn [#^Schema schema obj] (doto (GenericData$Fixed. schema)
Schema$Type/FIXED (fn pack-fixed [#^Schema schema obj] (doto (GenericData$Fixed. schema)
(.bytes obj)))

Schema$Type/ENUM (fn [#^Schema schema obj]
Schema$Type/ENUM (fn pack-enum [#^Schema schema obj]
(if-let [enum (some #{obj} (.getEnumSymbols schema))]
(GenericData$EnumSymbol. enum)
(throw-with-log "Enum does not define '" obj "'.")))

Schema$Type/UNION (fn [#^Schema schema obj]
Schema$Type/UNION (fn pack-union [#^Schema schema obj]
(loop [schemas (.getTypes schema)]
(if (empty? schemas)
(throw-with-log "No union type defined for object '" obj "'.")
Expand All @@ -63,17 +63,17 @@
rec
(recur (next schemas)))))))

Schema$Type/ARRAY (fn [#^Schema schema obj]
Schema$Type/ARRAY (fn pack-array [#^Schema schema obj]
(let [type-schema (.getElementType schema)
array (GenericData$Array. (count obj) schema)]
(doseq [e obj] (.add array (pack type-schema e)))
array))

Schema$Type/MAP (fn [#^Schema schema obj]
Schema$Type/MAP (fn pack-map [#^Schema schema obj]
(let [type-schema (.getValueType schema)]
(reduce (fn [m [k v]] (assoc m k (pack type-schema v))) {} obj)))

Schema$Type/RECORD (fn [#^Schema schema obj]
Schema$Type/RECORD (fn pack-record [#^Schema schema obj]
(if-let [ks (keys obj)]
(try
(let [record (GenericData$Record. schema)]
Expand Down Expand Up @@ -111,67 +111,93 @@
(throw-with-log "No pack defined for type '" type "'."))))

(def json-encoder
(fn [#^Schema schema obj]
(fn json-encoder-fn [#^Schema schema obj]
(encode-to schema obj
(fn [#^Schema schema #^ByteArrayOutputStream stream]
(JsonEncoder. schema stream))
(fn [#^ByteArrayOutputStream stream]
(.toString stream "UTF-8")))))

(def binary-encoder
(fn [#^Schema schema obj]
(fn binary-encoder-fn [#^Schema schema obj]
(encode-to schema obj
(fn [#^Schema schema #^ByteArrayOutputStream stream]
(BinaryEncoder. stream))
(fn [#^ByteArrayOutputStream stream]
(.. stream toString getBytes)))))
(.. stream toByteArray)))))

;
; Decoding
;

(declare unpack)
(declare unpack-impl json-schema)

(defn- unpack-record-fields
"Unpack only provided fields from record object."
[#^Schema schema #^GenericData$Record obj fields rmap]
(loop [[f & fs] fields m rmap]
(if f
(if (coll? f)
(if-let [#^Schema$Field fd (.getField schema (name (first f)))]
(let [k (.name fd)]
(if (next f)
(recur fs (assoc m k (unpack-record-fields (.schema fd) (.get obj k) (rest f) (m k))))
(recur fs (assoc m k (unpack-impl (.schema fd) (.get obj k))))))
(throw-with-log "No field for name '" (first f) "' exists in schema " (json-schema schema)))
(if-let [#^Schema$Field fd (.getField schema (name f))]
(let [k (.name fd)]
(recur fs (assoc m k (unpack-impl (.schema fd) (.get obj k)))))
(throw-with-log "No field for name '" f "' exists in schema " (json-schema schema))))
m)))

(defn- unpack-all-record-fields
"Unpack entire record object."
[#^Schema schema #^GenericData$Record obj]
(persistent! (reduce (fn [m #^Schema$Field f]
(let [k (.name f)]
(assoc! m k (unpack-impl (.schema f) (.get obj k)))))
(transient {}) (.getFields schema))))

(def #^{:private true}
*unpackers*
{Schema$Type/NULL (fn [#^Schema schema obj] nil)
Schema$Type/BOOLEAN (fn [#^Schema schema obj] (boolean obj))
Schema$Type/INT (fn [#^Schema schema obj] (int obj))
Schema$Type/LONG (fn [#^Schema schema obj] (long obj))
Schema$Type/FLOAT (fn [#^Schema schema obj] (float obj))
Schema$Type/DOUBLE (fn [#^Schema schema obj] (double obj))
Schema$Type/BYTES (fn [#^Schema schema obj] (bytes obj))
Schema$Type/FIXED (fn [#^Schema schema #^GenericData$Fixed obj] (.bytes obj))
Schema$Type/ENUM (fn [#^Schema schema obj] (str obj))

Schema$Type/STRING (fn [#^Schema schema obj] (if (instance? Utf8 obj)
{Schema$Type/NULL (fn unpack-nil [#^Schema schema obj] nil)
Schema$Type/BOOLEAN (fn unpack-boolean [#^Schema schema obj] (boolean obj))
Schema$Type/INT (fn unpack-int [#^Schema schema obj] (int obj))
Schema$Type/LONG (fn unpack-long [#^Schema schema obj] (long obj))
Schema$Type/FLOAT (fn unpack-float [#^Schema schema obj] (float obj))
Schema$Type/DOUBLE (fn unpack-double [#^Schema schema obj] (double obj))
Schema$Type/BYTES (fn unpack-bytes [#^Schema schema obj] (bytes obj))
Schema$Type/FIXED (fn unpack-fixed [#^Schema schema #^GenericData$Fixed obj] (.bytes obj))
Schema$Type/ENUM (fn unpack-enum [#^Schema schema obj] (str obj))

Schema$Type/STRING (fn unpack-stirng [#^Schema schema obj] (if (instance? Utf8 obj)
(str obj)
(throw (Exception. (str "Object '" obj "' is not a Utf8.")))))

Schema$Type/UNION (fn [#^Schema schema obj]
Schema$Type/UNION (fn unpack-union [#^Schema schema obj]
(loop [schemas (.getTypes schema)]
(if (empty? schemas)
(throw-with-log "No union type defined for object '" obj "'.")
(let [rec (try
(unpack (first schemas) obj)
(unpack-impl (first schemas) obj)
(catch Exception e :not-matching-untion-type))]
(if (not= rec :not-matching-untion-type)
rec
(recur (next schemas)))))))

Schema$Type/ARRAY (fn [#^Schema schema obj]
Schema$Type/ARRAY (fn unpack-array [#^Schema schema obj]
(let [type-schema (.getElementType schema)]
(vec (map #(unpack type-schema %) obj))))
(vec (map #(unpack-impl type-schema %) obj))))

Schema$Type/MAP (fn [#^Schema schema obj]
Schema$Type/MAP (fn unpack-map [#^Schema schema obj]
(let [type-schema (.getValueType schema)]
(reduce (fn [m [k v]] (assoc m (str k) (unpack type-schema v))) {} obj)))
(reduce (fn [m [k v]] (assoc m (str k) (unpack-impl type-schema v))) {} obj)))

Schema$Type/RECORD (fn unpack-record [#^Schema schema #^GenericData$Record obj fields]
(if (empty? fields)
(unpack-all-record-fields schema obj)
(unpack-record-fields schema obj fields {})))

Schema$Type/RECORD (fn [#^Schema schema #^GenericData$Record obj]
(reduce (fn [m #^Schema$Field f]
(let [k (.name f)]
(assoc m k (unpack (.schema f) (.get obj k)))))
{} (.getFields schema)))
})

(defn- decode-from
Expand All @@ -180,32 +206,44 @@
decoder (decoder schema obj)]
(.read reader nil decoder)))

(defn- unpack-impl
([#^Schema schema obj]
(unpack-impl schema obj nil))
([#^Schema schema obj fields]
(let [type (.getType schema)
unpacker (*unpackers* type)]
(if unpacker
(if (= type Schema$Type/RECORD)
(unpacker schema obj fields)
(unpacker schema obj))
(throw-with-log "No unpack defined for type '" type "'.")))))

(defn unpack
[schema obj & [decoder]]
([schema obj]
(unpack schema obj nil nil))
([schema obj decoder & [fields]]
(let [#^Schema schema (avro-schema schema)
type (.getType schema)
decode (or decoder (fn [_ obj] obj))
unpacker (*unpackers* type)
obj (decode schema obj)]
(if unpacker
(try
(unpacker schema obj)
(catch Exception e
(throw-with-log "Exception unpacking object '" obj "' for schema '" schema "'." e)))
(throw-with-log "No unpack defined for type '" type "'."))))
(try
(unpack-impl schema obj fields)
(catch Exception e
(throw-with-log "Exception unpacking object '" obj "' for schema '" schema "'." e))))))

(def json-decoder
(fn [#^Schema schema obj]
(fn json-decoder-fn [#^Schema schema obj]
(decode-from schema obj
(fn [#^Schema schema #^String obj]
(fn decode-from-json [#^Schema schema #^String obj]
(let [is (ByteArrayInputStream. (.getBytes obj "UTF-8"))]
(JsonDecoder. schema is))))))

(def binary-decoder
(fn [#^Schema schema obj]
(decode-from schema obj
(fn [#^Schema schema #^bytes obj]
(.createBinaryDecoder (DecoderFactory/defaultFactory) obj nil)))))
(let [factory (DecoderFactory/defaultFactory)
decode-from-binary (fn decode-from-binary
[#^Schema schema #^bytes obj]
(.createBinaryDecoder factory obj nil))]
(fn binary-decoder-fn [#^Schema schema obj]
(decode-from schema obj decode-from-binary))))

;
; Avro schema generation
Expand Down
15 changes: 14 additions & 1 deletion test/simple_avro/core_tests.clj
Expand Up @@ -31,7 +31,8 @@

(defavro-record MyNestedRecord
"f1" avro-int
"f2" avro-string)
"f2" MyRecord
"f3" MyRecord)

(defavro-record List
"value" avro-int
Expand All @@ -43,6 +44,12 @@
"next" {"value" 3
"next" nil}}})

(def nested-record {"f1" 10
"f2" {"f1" 20
"f2" "f2-f2"}
"f3" {"f1" 20
"f2" "f3-f2"}})

(defmacro test-pack-unpack
[name encoder decoder]
`(deftest ~name
Expand Down Expand Up @@ -74,6 +81,12 @@
(is (= (pu# "f2") "test")))

(is (= (unpack List (pack List recursive ~encoder) ~decoder) recursive))

(is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder) nested-record))
(is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder [:f1]) {"f1" 10}))
(is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder [:f1 :f2]) {"f1" 10 "f2" {"f1" 20 "f2" "f2-f2"}}))
(is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder [[:f3 :f2]]) {"f3" {"f2" "f3-f2"}}))
(is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder [:f1 [:f3 :f2]]) {"f1" 10 "f3" {"f2" "f3-f2"}}))

))

Expand Down

0 comments on commit 90d2d20

Please sign in to comment.