Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base: d33598e43b
...
compare: 48747fc0c2
Checking mergeability… Don't worry, you can still create the pull request.
  • 3 commits
  • 6 files changed
  • 0 commit comments
  • 1 contributor
View
4 README.md
@@ -40,7 +40,9 @@ string names for type references, for example:
(assert (= contact (unpack Contact packed <optional decoder>)))
_pack_ serializes objects into generic Avro objects. For json or binary serialization provide an optional _json-encoder_ or _binary-encoder_.
-Use equivalent decoder to de-serialize objects using _unpack_.
+Use equivalent decoder to de-serialize objects using _unpack_. _unpack_ takes an optional list of fields to deserialize from a record.
+Use singe filed names or path vectors for nested records, for example _[:first [:address :city]]_ will deserialize only
+the two fields first and city. If no fields provided, the entire record is deserialized.
### Custom types API
View
2  project.clj
@@ -1,4 +1,4 @@
-(defproject simple-avro "0.0.4-1.4-SNAPSHOT"
+(defproject simple-avro "0.0.5-1.4-SNAPSHOT"
:description "Clojure wrapper around Avro schema and serialization."
:url "http://github.com/asmyczek/simple-avro"
:dependencies [[org.clojure/clojure "1.2.0"]
View
150 src/simple_avro/core.clj
@@ -33,26 +33,26 @@
(def #^{:private true}
*packers*
- {Schema$Type/NULL (fn [#^Schema schema obj] nil)
- Schema$Type/BOOLEAN (fn [#^Schema schema obj] (boolean obj))
- Schema$Type/INT (fn [#^Schema schema obj] (int obj))
- Schema$Type/LONG (fn [#^Schema schema obj] (long obj))
- Schema$Type/FLOAT (fn [#^Schema schema obj] (float obj))
- Schema$Type/DOUBLE (fn [#^Schema schema obj] (double obj))
- Schema$Type/BYTES (fn [#^Schema schema obj] (bytes obj))
-
- Schema$Type/STRING (fn [#^Schema schema obj] (if (string? obj)
+ {Schema$Type/NULL (fn pack-nil [#^Schema schema obj] nil)
+ Schema$Type/BOOLEAN (fn pack-boolean [#^Schema schema obj] (boolean obj))
+ Schema$Type/INT (fn pack-int [#^Schema schema obj] (int obj))
+ Schema$Type/LONG (fn pack-long [#^Schema schema obj] (long obj))
+ Schema$Type/FLOAT (fn pack-float [#^Schema schema obj] (float obj))
+ Schema$Type/DOUBLE (fn pack-double [#^Schema schema obj] (double obj))
+ Schema$Type/BYTES (fn pack-bytes [#^Schema schema obj] (bytes obj))
+
+ Schema$Type/STRING (fn pack-string [#^Schema schema obj] (if (string? obj)
(Utf8. (str obj))
(throw (Exception. (str "'" obj "' is not a string.")))))
- Schema$Type/FIXED (fn [#^Schema schema obj] (doto (GenericData$Fixed. schema)
+ Schema$Type/FIXED (fn pack-fixed [#^Schema schema obj] (doto (GenericData$Fixed. schema)
(.bytes obj)))
- Schema$Type/ENUM (fn [#^Schema schema obj]
+ Schema$Type/ENUM (fn pack-enum [#^Schema schema obj]
(if-let [enum (some #{obj} (.getEnumSymbols schema))]
(GenericData$EnumSymbol. enum)
(throw-with-log "Enum does not define '" obj "'.")))
- Schema$Type/UNION (fn [#^Schema schema obj]
+ Schema$Type/UNION (fn pack-union [#^Schema schema obj]
(loop [schemas (.getTypes schema)]
(if (empty? schemas)
(throw-with-log "No union type defined for object '" obj "'.")
@@ -63,17 +63,17 @@
rec
(recur (next schemas)))))))
- Schema$Type/ARRAY (fn [#^Schema schema obj]
+ Schema$Type/ARRAY (fn pack-array [#^Schema schema obj]
(let [type-schema (.getElementType schema)
array (GenericData$Array. (count obj) schema)]
(doseq [e obj] (.add array (pack type-schema e)))
array))
- Schema$Type/MAP (fn [#^Schema schema obj]
+ Schema$Type/MAP (fn pack-map [#^Schema schema obj]
(let [type-schema (.getValueType schema)]
(reduce (fn [m [k v]] (assoc m k (pack type-schema v))) {} obj)))
- Schema$Type/RECORD (fn [#^Schema schema obj]
+ Schema$Type/RECORD (fn pack-record [#^Schema schema obj]
(if-let [ks (keys obj)]
(try
(let [record (GenericData$Record. schema)]
@@ -111,7 +111,7 @@
(throw-with-log "No pack defined for type '" type "'."))))
(def json-encoder
- (fn [#^Schema schema obj]
+ (fn json-encoder-fn [#^Schema schema obj]
(encode-to schema obj
(fn [#^Schema schema #^ByteArrayOutputStream stream]
(JsonEncoder. schema stream))
@@ -119,59 +119,85 @@
(.toString stream "UTF-8")))))
(def binary-encoder
- (fn [#^Schema schema obj]
+ (fn binary-encoder-fn [#^Schema schema obj]
(encode-to schema obj
(fn [#^Schema schema #^ByteArrayOutputStream stream]
(BinaryEncoder. stream))
(fn [#^ByteArrayOutputStream stream]
- (.. stream toString getBytes)))))
+ (.. stream toByteArray)))))
;
; Decoding
;
-(declare unpack)
+(declare unpack-impl json-schema)
+
+(defn- unpack-record-fields
+ "Unpack only provided fields from record object."
+ [#^Schema schema #^GenericData$Record obj fields rmap]
+ (loop [[f & fs] fields m rmap]
+ (if f
+ (if (coll? f)
+ (if-let [#^Schema$Field fd (.getField schema (name (first f)))]
+ (let [k (.name fd)]
+ (if (next f)
+ (recur fs (assoc m k (unpack-record-fields (.schema fd) (.get obj k) (rest f) (m k))))
+ (recur fs (assoc m k (unpack-impl (.schema fd) (.get obj k))))))
+ (throw-with-log "No field for name '" (first f) "' exists in schema " (json-schema schema)))
+ (if-let [#^Schema$Field fd (.getField schema (name f))]
+ (let [k (.name fd)]
+ (recur fs (assoc m k (unpack-impl (.schema fd) (.get obj k)))))
+ (throw-with-log "No field for name '" f "' exists in schema " (json-schema schema))))
+ m)))
+
+(defn- unpack-all-record-fields
+ "Unpack entire record object."
+ [#^Schema schema #^GenericData$Record obj]
+ (persistent! (reduce (fn [m #^Schema$Field f]
+ (let [k (.name f)]
+ (assoc! m k (unpack-impl (.schema f) (.get obj k)))))
+ (transient {}) (.getFields schema))))
(def #^{:private true}
*unpackers*
- {Schema$Type/NULL (fn [#^Schema schema obj] nil)
- Schema$Type/BOOLEAN (fn [#^Schema schema obj] (boolean obj))
- Schema$Type/INT (fn [#^Schema schema obj] (int obj))
- Schema$Type/LONG (fn [#^Schema schema obj] (long obj))
- Schema$Type/FLOAT (fn [#^Schema schema obj] (float obj))
- Schema$Type/DOUBLE (fn [#^Schema schema obj] (double obj))
- Schema$Type/BYTES (fn [#^Schema schema obj] (bytes obj))
- Schema$Type/FIXED (fn [#^Schema schema #^GenericData$Fixed obj] (.bytes obj))
- Schema$Type/ENUM (fn [#^Schema schema obj] (str obj))
-
- Schema$Type/STRING (fn [#^Schema schema obj] (if (instance? Utf8 obj)
+ {Schema$Type/NULL (fn unpack-nil [#^Schema schema obj] nil)
+ Schema$Type/BOOLEAN (fn unpack-boolean [#^Schema schema obj] (boolean obj))
+ Schema$Type/INT (fn unpack-int [#^Schema schema obj] (int obj))
+ Schema$Type/LONG (fn unpack-long [#^Schema schema obj] (long obj))
+ Schema$Type/FLOAT (fn unpack-float [#^Schema schema obj] (float obj))
+ Schema$Type/DOUBLE (fn unpack-double [#^Schema schema obj] (double obj))
+ Schema$Type/BYTES (fn unpack-bytes [#^Schema schema obj] (bytes obj))
+ Schema$Type/FIXED (fn unpack-fixed [#^Schema schema #^GenericData$Fixed obj] (.bytes obj))
+ Schema$Type/ENUM (fn unpack-enum [#^Schema schema obj] (str obj))
+
+ Schema$Type/STRING (fn unpack-stirng [#^Schema schema obj] (if (instance? Utf8 obj)
(str obj)
(throw (Exception. (str "Object '" obj "' is not a Utf8.")))))
- Schema$Type/UNION (fn [#^Schema schema obj]
+ Schema$Type/UNION (fn unpack-union [#^Schema schema obj]
(loop [schemas (.getTypes schema)]
(if (empty? schemas)
(throw-with-log "No union type defined for object '" obj "'.")
(let [rec (try
- (unpack (first schemas) obj)
+ (unpack-impl (first schemas) obj)
(catch Exception e :not-matching-untion-type))]
(if (not= rec :not-matching-untion-type)
rec
(recur (next schemas)))))))
- Schema$Type/ARRAY (fn [#^Schema schema obj]
+ Schema$Type/ARRAY (fn unpack-array [#^Schema schema obj]
(let [type-schema (.getElementType schema)]
- (vec (map #(unpack type-schema %) obj))))
+ (vec (map #(unpack-impl type-schema %) obj))))
- Schema$Type/MAP (fn [#^Schema schema obj]
+ Schema$Type/MAP (fn unpack-map [#^Schema schema obj]
(let [type-schema (.getValueType schema)]
- (reduce (fn [m [k v]] (assoc m (str k) (unpack type-schema v))) {} obj)))
+ (reduce (fn [m [k v]] (assoc m (str k) (unpack-impl type-schema v))) {} obj)))
+
+ Schema$Type/RECORD (fn unpack-record [#^Schema schema #^GenericData$Record obj fields]
+ (if (empty? fields)
+ (unpack-all-record-fields schema obj)
+ (unpack-record-fields schema obj fields {})))
- Schema$Type/RECORD (fn [#^Schema schema #^GenericData$Record obj]
- (reduce (fn [m #^Schema$Field f]
- (let [k (.name f)]
- (assoc m k (unpack (.schema f) (.get obj k)))))
- {} (.getFields schema)))
})
(defn- decode-from
@@ -180,32 +206,44 @@
decoder (decoder schema obj)]
(.read reader nil decoder)))
+(defn- unpack-impl
+ ([#^Schema schema obj]
+ (unpack-impl schema obj nil))
+ ([#^Schema schema obj fields]
+ (let [type (.getType schema)
+ unpacker (*unpackers* type)]
+ (if unpacker
+ (if (= type Schema$Type/RECORD)
+ (unpacker schema obj fields)
+ (unpacker schema obj))
+ (throw-with-log "No unpack defined for type '" type "'.")))))
+
(defn unpack
- [schema obj & [decoder]]
+ ([schema obj]
+ (unpack schema obj nil nil))
+ ([schema obj decoder & [fields]]
(let [#^Schema schema (avro-schema schema)
- type (.getType schema)
decode (or decoder (fn [_ obj] obj))
- unpacker (*unpackers* type)
obj (decode schema obj)]
- (if unpacker
- (try
- (unpacker schema obj)
- (catch Exception e
- (throw-with-log "Exception unpacking object '" obj "' for schema '" schema "'." e)))
- (throw-with-log "No unpack defined for type '" type "'."))))
+ (try
+ (unpack-impl schema obj fields)
+ (catch Exception e
+ (throw-with-log "Exception unpacking object '" obj "' for schema '" schema "'." e))))))
(def json-decoder
- (fn [#^Schema schema obj]
+ (fn json-decoder-fn [#^Schema schema obj]
(decode-from schema obj
- (fn [#^Schema schema #^String obj]
+ (fn decode-from-json [#^Schema schema #^String obj]
(let [is (ByteArrayInputStream. (.getBytes obj "UTF-8"))]
(JsonDecoder. schema is))))))
(def binary-decoder
- (fn [#^Schema schema obj]
- (decode-from schema obj
- (fn [#^Schema schema #^bytes obj]
- (.createBinaryDecoder (DecoderFactory/defaultFactory) obj nil)))))
+ (let [factory (DecoderFactory/defaultFactory)
+ decode-from-binary (fn decode-from-binary
+ [#^Schema schema #^bytes obj]
+ (.createBinaryDecoder factory obj nil))]
+ (fn binary-decoder-fn [#^Schema schema obj]
+ (decode-from schema obj decode-from-binary))))
;
; Avro schema generation
View
75 src/simple_avro/utils.clj
@@ -6,7 +6,9 @@
(org.apache.avro.file CodecFactory
DataFileStream
DataFileWriter
- DataFileReader)
+ DataFileReader
+ SeekableInput
+ SeekableFileInput)
(org.apache.avro.generic GenericDatumWriter
GenericDatumReader)
(java.io InputStream OutputStream File
@@ -36,13 +38,14 @@
(defn avro-spit
"Write to Avro data file."
[f schema objs & [meta]]
- (let [schema (avro-schema schema)
- writer (DataFileWriter. (GenericDatumWriter. schema))]
+ (let [#^File file (file f)
+ #^Schema schema (avro-schema schema)
+ writer (DataFileWriter. (GenericDatumWriter. schema))]
(try
(.setCodec writer (CodecFactory/deflateCodec 6))
(doseq [[k v] meta]
(.setMeta writer (str k) (str v)))
- (.create writer schema (file f))
+ (.create writer schema file)
(doseq [o objs]
(.append writer (pack schema o)))
(finally
@@ -51,9 +54,10 @@
(defn avro-slurp
"Read data from Avro data file."
[f]
- (let [reader (DataFileReader. (file f) (GenericDatumReader.))
- schema (.getSchema reader)
- read-next (fn read-next [reader]
+ (let [#^File file (file f)
+ #^DataFileReader reader (DataFileReader. file (GenericDatumReader.))
+ #^Schema schema (.getSchema reader)
+ read-next (fn read-next [#^DataFileReader reader]
(lazy-seq
(if (.hasNext reader)
(cons (unpack schema (.next reader)) (read-next reader))
@@ -65,7 +69,8 @@
(defn avro-slurp-meta
"Read meta from Avro data file."
[f & keys]
- (let [reader (DataFileReader. (file f) (GenericDatumReader.))]
+ (let [#^File file (file f)
+ reader (DataFileReader. file (GenericDatumReader.))]
(loop [[k & ks] keys mta {}]
(if (nil? k)
mta
@@ -75,7 +80,7 @@
; Custom file handling
;
-; output stream
+; Output stream
(defmulti #^{:private true}
output-stream class)
@@ -107,10 +112,10 @@
(defn avro-writer
[os schema]
- (let [schema (avro-schema schema)
- os (output-stream os)
- writer (doto (DataFileWriter. (GenericDatumWriter. schema))
- (.create schema os))]
+ (let [#^Schema schema (avro-schema schema)
+ #^OutputStream os (output-stream os)
+ writer (doto (DataFileWriter. (GenericDatumWriter. schema))
+ (.create schema os))]
(reify
Writer
(write [this obj]
@@ -123,36 +128,40 @@
; Input stream
(defmulti #^{:private true}
- input-stream class)
+ seekable-input class)
-(defmethod input-stream InputStream [#^OutputStream os]
- os)
+(defmethod seekable-input SeekableInput [#^OutputStream si]
+ si)
-(defmethod input-stream File [#^File f]
- (BufferedInputStream. (FileInputStream. f)))
+(defmethod seekable-input File [#^File f]
+ (SeekableFileInput. f))
-(defmethod input-stream String [#^String f]
- (input-stream (File. f)))
+(defmethod seekable-input String [#^String f]
+ (seekable-input (File. f)))
-(defmethod input-stream URL [#^URL f]
- (input-stream (.getPath f)))
+(defmethod seekable-input URL [#^URL f]
+ (seekable-input (.getPath f)))
-(defmethod input-stream URI [#^URI f]
- (input-stream (.toURL f)))
+(defmethod seekable-input URI [#^URI f]
+ (seekable-input (.toURL f)))
(defprotocol Reader
"General reader protocol."
- (read-next [this] "Read next element.")
- (has-next [this] "Checks if more element exists."))
+ (read-next [this] "Read next element.")
+ (has-next [this] "Checks if more element exists.")
+ (sync-pos [this pos] "Sync reader position.")
+ (position [this] "Returns approximate position between 0.0 and 1.0."))
(defn avro-reader
- [is]
- (let [is (input-stream is)
- dfs (DataFileStream. is (GenericDatumReader.))
- schema (.getSchema dfs)]
+ [si & [fields]]
+ (let [#^SeekableInput si (seekable-input si)
+ #^DataFileReader dfr (DataFileReader. si (GenericDatumReader.))
+ #^Schema schema (.getSchema dfr)]
(reify
Reader
- (read-next [this] (unpack schema (.next dfs)))
- (has-next [this] (.hasNext dfs))
+ (read-next [this] (unpack schema (.next dfr) nil fields))
+ (has-next [this] (.hasNext dfr))
+ (sync-pos [this pos] (.sync dfr pos))
+ (position [this] (.tell si))
Closable
- (close [this] (.close dfs)))))
+ (close [this] (.close dfr)))))
View
15 test/simple_avro/core_tests.clj
@@ -31,7 +31,8 @@
(defavro-record MyNestedRecord
"f1" avro-int
- "f2" avro-string)
+ "f2" MyRecord
+ "f3" MyRecord)
(defavro-record List
"value" avro-int
@@ -43,6 +44,12 @@
"next" {"value" 3
"next" nil}}})
+(def nested-record {"f1" 10
+ "f2" {"f1" 20
+ "f2" "f2-f2"}
+ "f3" {"f1" 20
+ "f2" "f3-f2"}})
+
(defmacro test-pack-unpack
[name encoder decoder]
`(deftest ~name
@@ -74,6 +81,12 @@
(is (= (pu# "f2") "test")))
(is (= (unpack List (pack List recursive ~encoder) ~decoder) recursive))
+
+ (is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder) nested-record))
+ (is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder [:f1]) {"f1" 10}))
+ (is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder [:f1 :f2]) {"f1" 10 "f2" {"f1" 20 "f2" "f2-f2"}}))
+ (is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder [[:f3 :f2]]) {"f3" {"f2" "f3-f2"}}))
+ (is (= (unpack MyNestedRecord (pack MyNestedRecord nested-record ~encoder) ~decoder [:f1 [:f3 :f2]]) {"f1" 10 "f3" {"f2" "f3-f2"}}))
))
View
31 test/simple_avro/utils_tests.clj
@@ -12,7 +12,7 @@
{"field1" "record3" "field2" 30}
{"field1" "record4" "field2" 40}])
-(deftest read-write-test
+(deftest spit-slurp-test
(let [file (java.io.File/createTempFile "avro-test-data", ".tmp")
_ (avro-spit file Test test-records {"m1" "test1" "m2" "test2"})
content (avro-slurp file)
@@ -31,3 +31,32 @@
(while (has-next r)
(println (read-next r))))
+;
+; Reader/writer example
+;
+(comment
+
+ (defn write-read-data
+ []
+ (let [tmp-file (str (System/getProperty "java.io.tmpdir") "test" (rand-int 1000) ".avro")]
+ (let [writer (avro-writer tmp-file Test)]
+ (println "Creating temp file " tmp-file)
+ (doseq [rec (apply concat (repeat 1000 test-records))]
+ (write writer rec))
+ (close writer))
+ (println "Reading " tmp-file)
+ (let [reader (avro-reader tmp-file)]
+ (loop [nx (has-next reader)]
+ (when nx
+ (println "Read " (read-next reader))
+ (println "Position " (position reader))
+ (recur (has-next reader))))
+ (close reader))
+ (println "Deleting " tmp-file)
+ (.delete (java.io.File. tmp-file))))
+
+
+ (deftest writer-reader-test
+ (write-read-data))
+
+)

No commit comments for this range

Something went wrong with that request. Please try again.