Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Initial commit.

  • Loading branch information...
commit 08759da035cd32e218bc04ff3226f9fa2cb705b3 0 parents
Adam Smyczek authored April 12, 2011
2  .gitignore
... ...
@@ -0,0 +1,2 @@
  1
+lib
  2
+classes
177  LICENSE
... ...
@@ -0,0 +1,177 @@
  1
+                              Apache License
  2
+                        Version 2.0, January 2004
  3
+                     http://www.apache.org/licenses/
  4
+
  5
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
  6
+
  7
+1. Definitions.
  8
+
  9
+   "License" shall mean the terms and conditions for use, reproduction,
  10
+   and distribution as defined by Sections 1 through 9 of this document.
  11
+
  12
+   "Licensor" shall mean the copyright owner or entity authorized by
  13
+   the copyright owner that is granting the License.
  14
+
  15
+   "Legal Entity" shall mean the union of the acting entity and all
  16
+   other entities that control, are controlled by, or are under common
  17
+   control with that entity. For the purposes of this definition,
  18
+   "control" means (i) the power, direct or indirect, to cause the
  19
+   direction or management of such entity, whether by contract or
  20
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
  21
+   outstanding shares, or (iii) beneficial ownership of such entity.
  22
+
  23
+   "You" (or "Your") shall mean an individual or Legal Entity
  24
+   exercising permissions granted by this License.
  25
+
  26
+   "Source" form shall mean the preferred form for making modifications,
  27
+   including but not limited to software source code, documentation
  28
+   source, and configuration files.
  29
+
  30
+   "Object" form shall mean any form resulting from mechanical
  31
+   transformation or translation of a Source form, including but
  32
+   not limited to compiled object code, generated documentation,
  33
+   and conversions to other media types.
  34
+
  35
+   "Work" shall mean the work of authorship, whether in Source or
  36
+   Object form, made available under the License, as indicated by a
  37
+   copyright notice that is included in or attached to the work
  38
+   (an example is provided in the Appendix below).
  39
+
  40
+   "Derivative Works" shall mean any work, whether in Source or Object
  41
+   form, that is based on (or derived from) the Work and for which the
  42
+   editorial revisions, annotations, elaborations, or other modifications
  43
+   represent, as a whole, an original work of authorship. For the purposes
  44
+   of this License, Derivative Works shall not include works that remain
  45
+   separable from, or merely link (or bind by name) to the interfaces of,
  46
+   the Work and Derivative Works thereof.
  47
+
  48
+   "Contribution" shall mean any work of authorship, including
  49
+   the original version of the Work and any modifications or additions
  50
+   to that Work or Derivative Works thereof, that is intentionally
  51
+   submitted to Licensor for inclusion in the Work by the copyright owner
  52
+   or by an individual or Legal Entity authorized to submit on behalf of
  53
+   the copyright owner. For the purposes of this definition, "submitted"
  54
+   means any form of electronic, verbal, or written communication sent
  55
+   to the Licensor or its representatives, including but not limited to
  56
+   communication on electronic mailing lists, source code control systems,
  57
+   and issue tracking systems that are managed by, or on behalf of, the
  58
+   Licensor for the purpose of discussing and improving the Work, but
  59
+   excluding communication that is conspicuously marked or otherwise
  60
+   designated in writing by the copyright owner as "Not a Contribution."
  61
+
  62
+   "Contributor" shall mean Licensor and any individual or Legal Entity
  63
+   on behalf of whom a Contribution has been received by Licensor and
  64
+   subsequently incorporated within the Work.
  65
+
  66
+2. Grant of Copyright License. Subject to the terms and conditions of
  67
+   this License, each Contributor hereby grants to You a perpetual,
  68
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
  69
+   copyright license to reproduce, prepare Derivative Works of,
  70
+   publicly display, publicly perform, sublicense, and distribute the
  71
+   Work and such Derivative Works in Source or Object form.
  72
+
  73
+3. Grant of Patent License. Subject to the terms and conditions of
  74
+   this License, each Contributor hereby grants to You a perpetual,
  75
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
  76
+   (except as stated in this section) patent license to make, have made,
  77
+   use, offer to sell, sell, import, and otherwise transfer the Work,
  78
+   where such license applies only to those patent claims licensable
  79
+   by such Contributor that are necessarily infringed by their
  80
+   Contribution(s) alone or by combination of their Contribution(s)
  81
+   with the Work to which such Contribution(s) was submitted. If You
  82
+   institute patent litigation against any entity (including a
  83
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
  84
+   or a Contribution incorporated within the Work constitutes direct
  85
+   or contributory patent infringement, then any patent licenses
  86
+   granted to You under this License for that Work shall terminate
  87
+   as of the date such litigation is filed.
  88
+
  89
+4. Redistribution. You may reproduce and distribute copies of the
  90
+   Work or Derivative Works thereof in any medium, with or without
  91
+   modifications, and in Source or Object form, provided that You
  92
+   meet the following conditions:
  93
+
  94
+   (a) You must give any other recipients of the Work or
  95
+       Derivative Works a copy of this License; and
  96
+
  97
+   (b) You must cause any modified files to carry prominent notices
  98
+       stating that You changed the files; and
  99
+
  100
+   (c) You must retain, in the Source form of any Derivative Works
  101
+       that You distribute, all copyright, patent, trademark, and
  102
+       attribution notices from the Source form of the Work,
  103
+       excluding those notices that do not pertain to any part of
  104
+       the Derivative Works; and
  105
+
  106
+   (d) If the Work includes a "NOTICE" text file as part of its
  107
+       distribution, then any Derivative Works that You distribute must
  108
+       include a readable copy of the attribution notices contained
  109
+       within such NOTICE file, excluding those notices that do not
  110
+       pertain to any part of the Derivative Works, in at least one
  111
+       of the following places: within a NOTICE text file distributed
  112
+       as part of the Derivative Works; within the Source form or
  113
+       documentation, if provided along with the Derivative Works; or,
  114
+       within a display generated by the Derivative Works, if and
  115
+       wherever such third-party notices normally appear. The contents
  116
+       of the NOTICE file are for informational purposes only and
  117
+       do not modify the License. You may add Your own attribution
  118
+       notices within Derivative Works that You distribute, alongside
  119
+       or as an addendum to the NOTICE text from the Work, provided
  120
+       that such additional attribution notices cannot be construed
  121
+       as modifying the License.
  122
+
  123
+   You may add Your own copyright statement to Your modifications and
  124
+   may provide additional or different license terms and conditions
  125
+   for use, reproduction, or distribution of Your modifications, or
  126
+   for any such Derivative Works as a whole, provided Your use,
  127
+   reproduction, and distribution of the Work otherwise complies with
  128
+   the conditions stated in this License.
  129
+
  130
+5. Submission of Contributions. Unless You explicitly state otherwise,
  131
+   any Contribution intentionally submitted for inclusion in the Work
  132
+   by You to the Licensor shall be under the terms and conditions of
  133
+   this License, without any additional terms or conditions.
  134
+   Notwithstanding the above, nothing herein shall supersede or modify
  135
+   the terms of any separate license agreement you may have executed
  136
+   with Licensor regarding such Contributions.
  137
+
  138
+6. Trademarks. This License does not grant permission to use the trade
  139
+   names, trademarks, service marks, or product names of the Licensor,
  140
+   except as required for reasonable and customary use in describing the
  141
+   origin of the Work and reproducing the content of the NOTICE file.
  142
+
  143
+7. Disclaimer of Warranty. Unless required by applicable law or
  144
+   agreed to in writing, Licensor provides the Work (and each
  145
+   Contributor provides its Contributions) on an "AS IS" BASIS,
  146
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  147
+   implied, including, without limitation, any warranties or conditions
  148
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
  149
+   PARTICULAR PURPOSE. You are solely responsible for determining the
  150
+   appropriateness of using or redistributing the Work and assume any
  151
+   risks associated with Your exercise of permissions under this License.
  152
+
  153
+8. Limitation of Liability. In no event and under no legal theory,
  154
+   whether in tort (including negligence), contract, or otherwise,
  155
+   unless required by applicable law (such as deliberate and grossly
  156
+   negligent acts) or agreed to in writing, shall any Contributor be
  157
+   liable to You for damages, including any direct, indirect, special,
  158
+   incidental, or consequential damages of any character arising as a
  159
+   result of this License or out of the use or inability to use the
  160
+   Work (including but not limited to damages for loss of goodwill,
  161
+   work stoppage, computer failure or malfunction, or any and all
  162
+   other commercial damages or losses), even if such Contributor
  163
+   has been advised of the possibility of such damages.
  164
+
  165
+9. Accepting Warranty or Additional Liability. While redistributing
  166
+   the Work or Derivative Works thereof, You may choose to offer,
  167
+   and charge a fee for, acceptance of support, warranty, indemnity,
  168
+   or other liability obligations and/or rights consistent with this
  169
+   License. However, in accepting such obligations, You may act only
  170
+   on Your own behalf and on Your sole responsibility, not on behalf
  171
+   of any other Contributor, and only if You agree to indemnify,
  172
+   defend, and hold each Contributor harmless for any liability
  173
+   incurred by, or claims asserted against, such Contributor by reason
  174
+   of your accepting any such warranty or additional liability.
  175
+
  176
+END OF TERMS AND CONDITIONS
  177
+
48  README.md
Source Rendered
... ...
@@ -0,0 +1,48 @@
  1
+# simple-avro
  2
+Clojure wrapper around Avro schema and serialization.
  3
+
  4
+## Quick Start
  5
+
  6
+### Schema definition
  7
+
  8
+    (defavro-enum State
  9
+      "AL" "AK" "AS" "AZ" "AR" "CA" "CO" ; ...
  10
+      )
  11
+
  12
+    (defavro-record Address
  13
+      :street  avro-string
  14
+      :city    avro-string
  15
+      :state   State
  16
+      :zip     avro-int
  17
+      :country avro-string)
  18
+
  19
+    (defavro-record Contact
  20
+      :first   avro-string
  21
+      :last    avro-string
  22
+      :address Address
  23
+      :email   avro-string
  24
+      :phone   (avro-union avro-string avro-null))
  25
+
  26
+_simple-avro_ implements all types defined in [Avro schema specification](http://avro.apache.org/docs/1.5.0/spec.html).
  27
+Just prepend _avro-_ to the type name or use plain string names. _defavro-_ macros defined for all named types
  28
+(_defavro-record_, _defavro_enum_ and _defavro-fixed) create var objects convenient for hierarchical schema composition.
  29
+Parameters namespace, aliases and doc can by provided in an optional argument map.
  30
+Use string names for type references to define recursive types, for example:
  31
+
  32
+    (defavro-record IntList
  33
+      "value" avro-int 
  34
+      "next"  (avro-union "IntList" avro-null))
  35
+
  36
+### Data serialization
  37
+
  38
+    (def contact {:first "Mike" :last ...})
  39
+    (def packed (pack Contact contact <optional encoder>))
  40
+    (assert (= contact (unpack Contact packed <optional decoder>)))
  41
+
  42
+_pack_ serializes objects into generic Avro objects. For json or binary serialization provide an optional _json-encoder_ or _binary-encoder_.
  43
+Use equivalent decoder to de-serialize objects using _unpack_.
  44
+
  45
+For more details, see examples and unit tests.
  46
+
  47
+Found a bug? Have a question? Drop me an email at adam.smyczek \_at\_ gmail.com.
  48
+
12  project.clj
... ...
@@ -0,0 +1,12 @@
  1
+(defproject simple-avro "0.0.2"
  2
+  :description "Clojure wrapper around Avro schema and serialization."
  3
+  :dependencies [[org.clojure/clojure "1.2.0"]
  4
+                 [org.clojure/clojure-contrib "1.2.0"]
  5
+                 [org.apache.avro/avro "1.5.0"]]
  6
+  :disable-deps-clean false
  7
+  :warn-on-reflection true
  8
+  :source-path "src"
  9
+  :test-path "test"
  10
+  :license {:name "Apache License - Version 2.0"
  11
+            :url "http://www.apache.org/licenses/"})
  12
+
58  src/examples/address_book.clj
... ...
@@ -0,0 +1,58 @@
  1
+(ns examples.address-book
  2
+  (:use (simple-avro schema core)))
  3
+
  4
+; Schema
  5
+
  6
+(defavro-enum Country
  7
+  "USA" "Germany" "France" ; ...
  8
+  )
  9
+
  10
+(defavro-enum State
  11
+  "AL" "AK" "AS" "AZ" "AR" "CA" "CO" ; ...
  12
+  )
  13
+
  14
+(defavro-record Address
  15
+  :street  avro-string
  16
+  :city    avro-string
  17
+  :state   State
  18
+  :zip     avro-int
  19
+  :country Country)
  20
+
  21
+(defavro-record Person
  22
+  :first   avro-string
  23
+  :last    avro-string
  24
+  :address Address
  25
+  :email   avro-string
  26
+  :phone   (avro-union avro-string avro-null))
  27
+
  28
+(defavro-record Company
  29
+  :name    avro-string
  30
+  :address Address
  31
+  :contact Person)
  32
+
  33
+(def Contact
  34
+  (avro-union Person Company))
  35
+
  36
+(def AddressBook
  37
+  (avro-array Contact))
  38
+
  39
+
  40
+; Sample records
  41
+
  42
+(def address-book
  43
+  [{"first"  "Mike"
  44
+   "last"    "Foster"
  45
+   "address" {"street"  "South Park Str. 14"
  46
+              "city"    "Wasco"
  47
+              "state"   "CA"
  48
+              "zip"     95171
  49
+              "country" "USA"}
  50
+   "email"   "mike@home.com"
  51
+   "phone"   nil}])
  52
+
  53
+; Serialization
  54
+
  55
+(let [packed-address-book   (pack AddressBook address-book)
  56
+      unpacked-address-book (unpack AddressBook packed-address-book)]
  57
+  (assert (= address-book unpacked-address-book)))
  58
+
252  src/simple_avro/core.clj
... ...
@@ -0,0 +1,252 @@
  1
+(ns simple-avro.core
  2
+  {:doc "Core namespace defines serialization/de-serialization functions."}
  3
+  (:require (clojure.contrib [json :as json]))
  4
+  (:import (java.io FileOutputStream ByteArrayOutputStream)
  5
+           (org.apache.avro Schema Schema$Type Schema$Field)
  6
+           (org.apache.avro.generic GenericData$EnumSymbol
  7
+                                    GenericData$Fixed
  8
+                                    GenericData$Array
  9
+                                    GenericData$Record
  10
+                                    GenericDatumWriter
  11
+                                    GenericDatumReader)
  12
+           (org.apache.avro.io EncoderFactory DecoderFactory)
  13
+           (org.apache.avro.util Utf8)))
  14
+
  15
+
  16
+;
  17
+; Encoding 
  18
+;
  19
+
  20
+(declare pack avro-schema)
  21
+
  22
+(defmacro throw-with-log
  23
+  "Throw exception and log helper."
  24
+  {:private true}
  25
+  [ & msg ]
  26
+  `(let [l#  ~(last msg)]
  27
+     (if (instance? Exception l#)
  28
+       (throw (Exception. (apply str ~(vec (butlast msg))) l#))
  29
+       (throw (Exception. (str ~@msg))))))
  30
+
  31
+(def #^{:private true}
  32
+  *packers*
  33
+  {Schema$Type/NULL     (fn [#^Schema schema obj] nil)
  34
+   Schema$Type/BOOLEAN  (fn [#^Schema schema obj] (boolean obj)) 
  35
+   Schema$Type/INT      (fn [#^Schema schema obj] (int obj))
  36
+   Schema$Type/LONG     (fn [#^Schema schema obj] (long obj))
  37
+   Schema$Type/FLOAT    (fn [#^Schema schema obj] (float obj))
  38
+   Schema$Type/DOUBLE   (fn [#^Schema schema obj] (double obj))
  39
+   Schema$Type/BYTES    (fn [#^Schema schema obj] (bytes obj))
  40
+
  41
+   Schema$Type/STRING   (fn [#^Schema schema obj] (if (string? obj)
  42
+                                           (Utf8. (str obj))
  43
+                                           (throw (Exception. (str "'" obj "' is not a string.")))))
  44
+   Schema$Type/FIXED    (fn [#^Schema schema obj] (GenericData$Fixed. schema obj))
  45
+
  46
+   Schema$Type/ENUM     (fn [#^Schema schema obj] 
  47
+                          (if-let [enum (some #{obj} (.getEnumSymbols schema))]
  48
+                            (GenericData$EnumSymbol. schema enum)
  49
+                            (throw-with-log "Enum does not define '" obj "'.")))
  50
+
  51
+   Schema$Type/UNION    (fn [#^Schema schema obj] 
  52
+                          (loop [schemas (.getTypes schema)]
  53
+                            (if (empty? schemas)
  54
+                              (throw-with-log "No union type defined for object '" obj "'.")
  55
+                              (let [rec (try
  56
+                                          (pack (first schemas) obj)
  57
+                                          (catch Exception e :not-matching-untion-type))] 
  58
+                                (if (not= rec :not-matching-untion-type)
  59
+                                  rec
  60
+                                  (recur (next schemas)))))))
  61
+
  62
+   Schema$Type/ARRAY    (fn [#^Schema schema obj] 
  63
+                          (let [type-schema (.getElementType schema)
  64
+                                array       (GenericData$Array. (count obj) schema)]
  65
+                            (doseq [e obj] (.add array (pack type-schema e)))
  66
+                            array))
  67
+
  68
+   Schema$Type/MAP      (fn [#^Schema schema obj] 
  69
+                          (let [type-schema (.getValueType schema)]
  70
+                            (reduce (fn [m [k v]] (assoc m k (pack type-schema v))) {} obj)))
  71
+
  72
+   Schema$Type/RECORD   (fn [#^Schema schema obj]
  73
+                          (if-let [ks (keys obj)]
  74
+                            (let [record (GenericData$Record. schema)]
  75
+                              (doseq [#^String k ks]
  76
+                                (let [field (.getField schema k)]
  77
+                                  (.put record k (pack (.schema field) (obj k)))))
  78
+                              record)))
  79
+
  80
+    })
  81
+
  82
+(defn- encode-to
  83
+  [#^Schema schema obj encoder result]
  84
+  (let [stream  (ByteArrayOutputStream.)
  85
+        writer  (GenericDatumWriter. schema)
  86
+        #^java.io.Flushable encoder (encoder schema stream)]
  87
+    (.write writer obj encoder)
  88
+    (.flush encoder)
  89
+    (result stream)))
  90
+
  91
+(defn pack
  92
+  [schema obj & [encoder]]
  93
+  (let [#^Schema schema (avro-schema schema)
  94
+                 type   (.getType schema)
  95
+                 encode (or encoder (fn [_ obj] obj))
  96
+                 packer (*packers* type)]
  97
+    (if packer
  98
+      (try
  99
+        (encode schema (packer schema obj))
  100
+        (catch Exception e
  101
+          (throw-with-log "Exception reading object '" obj "' for schema '" schema "'." e)))
  102
+      (throw-with-log "No pack defined for type '" type "'."))))
  103
+
  104
+(def json-encoder
  105
+  (fn [#^Schema schema obj]
  106
+    (encode-to schema obj
  107
+      (fn [#^Schema schema #^ByteArrayOutputStream stream]
  108
+        (.jsonEncoder (EncoderFactory/get) schema stream))
  109
+      (fn [#^ByteArrayOutputStream stream]
  110
+        (.. stream toString)))))
  111
+
  112
+(def binary-encoder
  113
+  (fn [#^Schema schema obj]
  114
+    (encode-to schema obj
  115
+      (fn [#^Schema schema #^ByteArrayOutputStream stream]
  116
+        (.binaryEncoder (EncoderFactory/get) stream nil))
  117
+      (fn [#^ByteArrayOutputStream stream]
  118
+        (.. stream toString getBytes)))))
  119
+
  120
+;
  121
+; Decoding
  122
+;
  123
+
  124
+(declare unpack)
  125
+
  126
+(def #^{:private true}
  127
+  *unpackers*
  128
+  {Schema$Type/NULL     (fn [#^Schema schema obj] nil)
  129
+   Schema$Type/BOOLEAN  (fn [#^Schema schema obj] (boolean obj))
  130
+   Schema$Type/INT      (fn [#^Schema schema obj] (int obj))
  131
+   Schema$Type/LONG     (fn [#^Schema schema obj] (long obj))
  132
+   Schema$Type/FLOAT    (fn [#^Schema schema obj] (float obj))
  133
+   Schema$Type/DOUBLE   (fn [#^Schema schema obj] (double obj))
  134
+   Schema$Type/BYTES    (fn [#^Schema schema obj] (bytes obj))
  135
+   Schema$Type/FIXED    (fn [#^Schema schema #^GenericData$Fixed obj] (.bytes obj))
  136
+   Schema$Type/ENUM     (fn [#^Schema schema obj] (str obj))
  137
+
  138
+   Schema$Type/STRING   (fn [#^Schema schema obj] (if (instance? Utf8 obj)
  139
+                                           (str obj)
  140
+                                           (throw (Exception. (str "Object '" obj "' is not a Utf8.")))))
  141
+
  142
+   Schema$Type/UNION    (fn [#^Schema schema obj] 
  143
+                          (loop [schemas (.getTypes schema)]
  144
+                            (if (empty? schemas)
  145
+                              (throw-with-log "No union type defined for object '" obj "'.")
  146
+                              (let [rec (try
  147
+                                          (unpack (first schemas) obj)
  148
+                                          (catch Exception e :not-matching-untion-type))]
  149
+                                (if (not= rec :not-matching-untion-type)
  150
+                                  rec
  151
+                                  (recur (next schemas)))))))
  152
+
  153
+   Schema$Type/ARRAY    (fn [#^Schema schema obj] 
  154
+                          (let [type-schema (.getElementType schema)]
  155
+                            (vec (map #(unpack type-schema %) obj))))
  156
+
  157
+   Schema$Type/MAP      (fn [#^Schema schema obj] 
  158
+                          (let [type-schema (.getValueType schema)]
  159
+                            (reduce (fn [m [k v]] (assoc m (str k) (unpack type-schema v))) {} obj)))
  160
+
  161
+   Schema$Type/RECORD   (fn [#^Schema schema #^GenericData$Record obj]
  162
+                          (reduce (fn [m #^Schema$Field f]
  163
+                                    (let [k (.name f)]
  164
+                                      (assoc m k (unpack (.schema f) (.get obj k)))))
  165
+                                  {} (.getFields schema)))
  166
+    })
  167
+
  168
+(defn- decode-from
  169
+  [schema obj decoder]
  170
+  (let [stream  (ByteArrayOutputStream.)
  171
+        reader  (GenericDatumReader. schema)
  172
+        decoder (decoder schema obj)]
  173
+    (.read reader nil decoder)))
  174
+
  175
+(defn unpack
  176
+  [schema obj & [decoder]]
  177
+  (let [#^Schema schema   (avro-schema schema)
  178
+                 type     (.getType schema)
  179
+                 decode   (or decoder (fn [_ obj] obj))
  180
+                 unpacker (*unpackers* type)
  181
+                 obj      (decode schema obj)]
  182
+    (if unpacker
  183
+      (try
  184
+        (unpacker schema obj)
  185
+        (catch Exception e
  186
+          (throw-with-log "Exception unpacking object '" obj "' for schema '" schema "'." e)))
  187
+      (throw-with-log "No unpack defined for type '" type "'."))))
  188
+
  189
+(def json-decoder
  190
+  (fn [#^Schema schema obj]
  191
+    (decode-from schema obj
  192
+      (fn [#^Schema schema #^String obj]
  193
+        (.jsonDecoder (DecoderFactory/get) schema obj)))))
  194
+
  195
+(def binary-decoder
  196
+  (fn [#^Schema schema obj]
  197
+    (decode-from schema obj
  198
+      (fn [#^Schema schema #^bytes obj]
  199
+        (.binaryDecoder (DecoderFactory/get) obj nil)))))
  200
+
  201
+; 
  202
+; Avro schema generation
  203
+;
  204
+
  205
+(def named-types nil)
  206
+
  207
+(defn- traverse-schema
  208
+  "Traverse types of a schema."
  209
+  [schema f]
  210
+  (cond 
  211
+    (vector? schema)
  212
+      (vec (map #(f %) schema))
  213
+
  214
+    (map? schema)
  215
+      (case (:type schema)
  216
+        "array"   (assoc schema :items (f (:items schema)))
  217
+        "map"     (assoc schema :values (f (:values schema)))
  218
+        "record"  (assoc schema :fields (vec (map #(assoc % :type (f (:type %))) (:fields schema))))
  219
+        schema)
  220
+    :else schema))
  221
+
  222
+(defn- flatten-named-types
  223
+  "Ensures a named type is only defined once."
  224
+  [schema]
  225
+  (if-let [name (:name schema)]
  226
+    (if (some @named-types [name])
  227
+      name
  228
+      (let [schema (traverse-schema schema flatten-named-types)]
  229
+        (swap! named-types conj name)
  230
+        schema))
  231
+    (traverse-schema schema flatten-named-types)))
  232
+        
  233
+(defn avro-schema
  234
+  "Convert a simple-avro or json string schema to Avro schema object."
  235
+  [schema]
  236
+  (cond 
  237
+    (instance? Schema schema)
  238
+      schema
  239
+    (string? schema)
  240
+      (Schema/parse #^String schema)
  241
+    :else
  242
+      (let [schema (binding [named-types (atom #{})]
  243
+                     (flatten-named-types schema))]
  244
+        (Schema/parse #^String (json/json-str schema)))))
  245
+
  246
+(defn json-schema
  247
+  "Print schema to a json string. Provide optional parameter {:pretty bool}
  248
+  for pretty printing. Default is false."
  249
+  [schema & [opts]]
  250
+  (let [pretty (or (:pretty opts) false)]
  251
+    (.toString #^Schema (avro-schema schema) pretty)))
  252
+
101  src/simple_avro/schema.clj
... ...
@@ -0,0 +1,101 @@
  1
+(ns simple-avro.schema
  2
+  {:doc "Avro 1.5 schema specification.
  3
+        See http://avro.apache.org/docs/1.5.0/spec.html for details."}
  4
+  (:use (clojure.contrib [string :only (as-str)]
  5
+                         [json :only (json-str)])))
  6
+
  7
+(defmacro avro-type
  8
+  "Standard type declaration."
  9
+  [type]
  10
+  `{:type ~type})
  11
+
  12
+; Primitive types
  13
+
  14
+(defmacro avro-prim
  15
+  "Primitive types declaration helper."
  16
+  {:private true}
  17
+  [type]
  18
+  `(def ~(symbol (str "avro-" type))
  19
+     (avro-type ~(str type))))
  20
+
  21
+; Primitive types
  22
+(avro-prim null)
  23
+(avro-prim boolean)
  24
+(avro-prim int)
  25
+(avro-prim long)
  26
+(avro-prim float)
  27
+(avro-prim double)
  28
+(avro-prim bytes)
  29
+(avro-prim string)
  30
+
  31
+; Complex types
  32
+
  33
+(defn avro-array
  34
+  [type]
  35
+  {:type "array" :items type})
  36
+
  37
+(defn avro-map
  38
+  [type]
  39
+  {:type "map" :values type})
  40
+
  41
+(defn avro-union
  42
+  [ & types]
  43
+  (vec types))
  44
+
  45
+; Named types
  46
+
  47
+(defn avro-record
  48
+  [name & decl]
  49
+  (let [schema        {:type "record"
  50
+                       :name name}
  51
+        [schema decl] (if (map? (first decl))
  52
+                        [(merge (first decl) schema) (next decl)]
  53
+                        [schema decl])]
  54
+    (assoc schema :fields
  55
+      (loop [d decl fields []]
  56
+        (if (empty? d)
  57
+          fields
  58
+          (let [[field d] [{:name (as-str (first d))
  59
+                            :type (first (next d))}
  60
+                           (drop 2 d)]
  61
+                [field d] (if (map? (first d))
  62
+                            [(merge (first d) field) (next d)]
  63
+                            [field d])]
  64
+            (recur d (conj fields field))))))))
  65
+
  66
+(defn avro-enum
  67
+  [name & decl]
  68
+  (let [schema        {:type "enum"
  69
+                       :name name}
  70
+        [schema decl] (if (map? (first decl))
  71
+                        [(merge (first decl) schema) (next decl)]
  72
+                        [schema decl])]
  73
+    (assoc schema :symbols (vec (clojure.core/map as-str decl)))))
  74
+
  75
+(defn avro-fixed
  76
+  [name size & [opts]]
  77
+  (let [schema {:type "fixed"
  78
+                :size size
  79
+                :name name}]
  80
+    (if opts
  81
+      (merge opts schema)
  82
+      schema)))
  83
+
  84
+; Convenient named type declaration macros
  85
+
  86
+(defmacro defavro-record
  87
+  [name & decl]
  88
+  `(def ~name
  89
+     (avro-record ~(str name) ~@decl)))
  90
+
  91
+(defmacro defavro-enum
  92
+  [name & decl]
  93
+  `(def ~name
  94
+     (avro-enum ~(str name) ~@decl)))
  95
+
  96
+
  97
+(defmacro defavro-fixed
  98
+  [name size & [opts]]
  99
+  `(def ~name
  100
+     (avro-fixed ~(str name) ~size ~opts)))
  101
+
66  src/simple_avro/utils.clj
... ...
@@ -0,0 +1,66 @@
  1
+(ns simple-avro.utils
  2
+  {:doc "General utils."}
  3
+  (:use (simple-avro core))
  4
+  (:require (clojure.contrib [json :as json]))
  5
+  (:import (org.apache.avro Schema Schema$Type)
  6
+           (org.apache.avro.file CodecFactory
  7
+                                 DataFileWriter
  8
+                                 DataFileReader)
  9
+           (org.apache.avro.generic GenericDatumWriter
  10
+                                    GenericDatumReader)
  11
+           (java.io File)
  12
+           (java.net URI URL)
  13
+           (java.util NoSuchElementException)))
  14
+
  15
+(defmulti #^{:private true}
  16
+  file class)
  17
+
  18
+(defmethod file File [f]
  19
+  f)
  20
+
  21
+(defmethod file String [f]
  22
+  (File. f))
  23
+
  24
+(defmethod file URL [f]
  25
+  (file (.getPath f)))
  26
+
  27
+(defmethod file URI [f]
  28
+  (file (.toURL f)))
  29
+
  30
+(defn write-file
  31
+  "Write to Avro data file."
  32
+  [f schema objs & [meta]]
  33
+  (let [schema (avro-schema schema)
  34
+        writer (DataFileWriter. (GenericDatumWriter. schema))]
  35
+    (try
  36
+      (.setCodec writer (CodecFactory/deflateCodec 6))
  37
+      (doseq [[k v] meta]
  38
+        (.setMeta writer (str k) (str v)))
  39
+      (.create writer schema (file f))
  40
+      (doseq [o objs]
  41
+        (.append writer (pack schema o)))
  42
+      (finally
  43
+        (.close writer)))))
  44
+
  45
+(defn read-file
  46
+  "Read data from Avro data file."
  47
+  [f]
  48
+  (let [reader    (DataFileReader. (file f) (GenericDatumReader.))
  49
+        schema    (.getSchema reader)
  50
+        read-next (fn read-next [reader]
  51
+                    (try
  52
+                      (cons (unpack schema (.next reader)) (read-next reader))
  53
+                      (catch Exception e nil)
  54
+                      (finally
  55
+                        (.close reader))))]
  56
+    (read-next reader)))
  57
+
  58
+(defn read-meta
  59
+  "Read meta from Avro data file."
  60
+  [f]
  61
+  (let [reader (DataFileReader. (file f) (GenericDatumReader.))]
  62
+    (loop [[k & ks] (.getMetaKeys reader) mta {}]
  63
+      (if (nil? k)
  64
+        mta
  65
+        (recur ks (assoc mta k (String. (.getMeta reader k) "UTF-8")))))))
  66
+      
83  test/simple_avro/core_tests.clj
... ...
@@ -0,0 +1,83 @@
  1
+(ns simple-avro.core-tests
  2
+  (:use (simple-avro schema core)
  3
+        (clojure test)))
  4
+
  5
+(deftest test-prim-types
  6
+  (is (= (pack avro-null    nil)          nil))
  7
+  (is (= (pack avro-null    5)            nil))
  8
+  (is (= (pack avro-boolean true)         true))
  9
+  (is (= (pack avro-boolean nil)          false))
  10
+  (is (= (pack avro-int     5)            5))
  11
+  (is (= (pack avro-long    10)           (long 10)))
  12
+  (is (= (pack avro-long    (long 10))    (long 10)))
  13
+  (is (= (pack avro-float   2.5)          (float 2.5)))
  14
+  (is (= (pack avro-float   (float 2.5))  (float 2.5)))
  15
+  (is (= (pack avro-double  2.5)          (double 2.5)))
  16
+  (is (= (pack avro-double  (double 2.5)) (double 2.5)))
  17
+  (is (= (str (pack avro-string  "test")) "test")))
  18
+
  19
+; Some types
  20
+(def bool-array (avro-array avro-boolean))
  21
+(def int-map    (avro-map avro-int))
  22
+(def a-union    (avro-union avro-string avro-int avro-null))
  23
+
  24
+(defavro-fixed MyFixed 2)
  25
+
  26
+(defavro-enum MyEnum "A" "B" "C")
  27
+
  28
+(defavro-record MyRecord
  29
+  "f1" avro-int
  30
+  "f2" avro-string)
  31
+
  32
+(defavro-record MyNestedRecord
  33
+  "f1" avro-int
  34
+  "f2" avro-string)
  35
+
  36
+(defavro-record List
  37
+  "value" avro-int 
  38
+  "next"  (avro-union "List" avro-null))
  39
+
  40
+(def recursive 
  41
+  {"value" 1 
  42
+   "next"  {"value" 2
  43
+            "next"  {"value" 3
  44
+                     "next"  nil}}})
  45
+
  46
+(defmacro test-pack-unpack
  47
+  [name encoder decoder]
  48
+  `(deftest ~name
  49
+    (is (= (unpack avro-null    (pack avro-null    nil  ~encoder) ~decoder)         nil))
  50
+    (is (= (unpack avro-null    (pack avro-null    5    ~encoder) ~decoder)         nil))
  51
+    (is (= (unpack avro-boolean (pack avro-boolean true ~encoder) ~decoder)         true))
  52
+    (is (= (unpack avro-int     (pack avro-int     5    ~encoder) ~decoder)         5))
  53
+    (is (= (unpack avro-long    (pack avro-long    10   ~encoder) ~decoder)         (long 10)))
  54
+    (is (= (unpack avro-float   (pack avro-float   2.5  ~encoder) ~decoder)         (float 2.5)))
  55
+    (is (= (unpack avro-double  (pack avro-double  2.5  ~encoder) ~decoder)         (double 2.5)))
  56
+    (is (= (str (unpack avro-string (pack avro-string  "test" ~encoder) ~decoder))  "test"))
  57
+
  58
+    (is (= (unpack bool-array (pack bool-array [true false false] ~encoder) ~decoder) [true false false]))
  59
+    (is (= (unpack int-map (pack int-map {"a" 1 "b" 2} ~encoder) ~decoder) {"a" 1 "b" 2}))
  60
+
  61
+    (is (= (unpack a-union (pack a-union "test" ~encoder) ~decoder) "test"))
  62
+    (is (= (unpack a-union (pack a-union 10 ~encoder) ~decoder) 10))
  63
+
  64
+    (let [pu# (unpack MyFixed (pack MyFixed (byte-array [(byte 1) (byte 2)]) ~encoder) ~decoder)]
  65
+      (is (= (nth pu# 0) 1))
  66
+      (is (= (nth pu# 1) 2)))
  67
+
  68
+    (is (= (unpack MyEnum (pack MyEnum "A" ~encoder) ~decoder) "A"))
  69
+    (is (= (unpack MyEnum (pack MyEnum "B" ~encoder) ~decoder) "B"))
  70
+    (is (= (unpack MyEnum (pack MyEnum "C" ~encoder) ~decoder) "C"))
  71
+
  72
+    (let [pu# (unpack MyRecord (pack MyRecord {"f1" 6 "f2" "test"} ~encoder) ~decoder)]
  73
+      (is (= (pu# "f1") 6))
  74
+      (is (= (pu# "f2") "test")))
  75
+
  76
+    (is (= (unpack List (pack List recursive ~encoder) ~decoder) recursive))
  77
+
  78
+  ))
  79
+
  80
+(test-pack-unpack test-prim-types-pack-unpack-no-decoder nil nil)
  81
+(test-pack-unpack test-prim-types-pack-unpack-json json-encoder json-decoder)
  82
+(test-pack-unpack test-prim-types-pack-unpack-binary binary-encoder binary-decoder)
  83
+
65  test/simple_avro/schema_tests.clj
... ...
@@ -0,0 +1,65 @@
  1
+(ns simple-avro.schema-tests
  2
+  (:use (simple-avro schema core)
  3
+        (clojure test)))
  4
+
  5
+(deftest test-prim-types
  6
+  (is (= avro-null    {:type "null"}))
  7
+  (is (= avro-boolean {:type "boolean"}))
  8
+  (is (= avro-int     {:type "int"}))
  9
+  (is (= avro-long    {:type "long"}))
  10
+  (is (= avro-float   {:type "float"}))
  11
+  (is (= avro-double  {:type "double"}))
  12
+  (is (= avro-bytes   {:type "bytes"}))
  13
+  (is (= avro-string  {:type "string"}))) 
  14
+
  15
+(deftest test-complex-types
  16
+  (is (= (avro-array avro-int) {:type "array" :items {:type "int"}}))
  17
+  (is (= (avro-map avro-string) {:type "map" :values {:type "string"}}))
  18
+  (is (= (avro-union avro-string avro-int avro-null)
  19
+         [{:type "string"} {:type "int"} {:type "null"}])))
  20
+
  21
+(deftest test-named-types
  22
+  (is (= (avro-fixed "MyFixed" 16)
  23
+         {:type "fixed" :size 16 :name "MyFixed"}))
  24
+  (is (= (avro-enum "MyEnum" "A" "B" "C")
  25
+         {:type "enum" :symbols ["A" "B" "C"] :name "MyEnum"}))
  26
+  (is (= (avro-record "MyRecord"
  27
+            "f1" avro-int
  28
+            "f2" avro-string)
  29
+         {:type "record"
  30
+          :name "MyRecord"
  31
+          :fields [{:name "f1" :type {:type "int"}}
  32
+                   {:name "f2" :type {:type "string"}}]})))
  33
+
  34
+(defavro-fixed MyDefFixed 16)
  35
+
  36
+(defavro-enum MyDefEnum "A" "B" "C")
  37
+
  38
+(defavro-record MyDefRecord
  39
+  "f1" avro-int
  40
+  "f2" avro-string)
  41
+
  42
+(deftest test-defavro
  43
+  (is (= MyDefFixed {:type "fixed" :size 16 :name "MyDefFixed"}))
  44
+  (is (= MyDefEnum {:type "enum" :symbols ["A" "B" "C"] :name "MyDefEnum"}))
  45
+  (is (= MyDefRecord
  46
+         {:type "record"
  47
+          :name "MyDefRecord"
  48
+          :fields [{:name "f1" :type {:type "int"}}
  49
+                   {:name "f2" :type {:type "string"}}]})))
  50
+
  51
+
  52
+(deftest test-opts
  53
+  (is (= (avro-fixed "MyFixed" 16 {:namespace "test-namespace"})
  54
+         {:type "fixed" :size 16 :name "MyFixed" :namespace "test-namespace"}))
  55
+  (is (= (avro-enum "MyEnum" {:namespace "test-namespace"} "A" "B" "C")
  56
+         {:type "enum" :symbols ["A" "B" "C"] :name "MyEnum" :namespace "test-namespace"}))
  57
+  (is (= (avro-record "MyRecord" {:namespace "test-namespace"}
  58
+            "f1" avro-int
  59
+            "f2" avro-string)
  60
+         {:type "record"
  61
+          :name "MyRecord"
  62
+          :namespace "test-namespace"
  63
+          :fields [{:name "f1" :type {:type "int"}}
  64
+                   {:name "f2" :type {:type "string"}}]})))
  65
+
23  test/simple_avro/utils_tests.clj
... ...
@@ -0,0 +1,23 @@
  1
+(ns simple-avro.utils-tests
  2
+  (:use (simple-avro schema core utils)
  3
+        (clojure test)))
  4
+
  5
+(defavro-record Test
  6
+  "field1" avro-string
  7
+  "field2" avro-int)
  8
+
  9
+(def test-records
  10
+  [{"field1" "record1" "field2" 10}
  11
+   {"field1" "record2" "field2" 20}
  12
+   {"field1" "record3" "field2" 30}
  13
+   {"field1" "record4" "field2" 40}])
  14
+
  15
+(deftest read-write-test
  16
+  (let [file    (java.io.File/createTempFile "avro-test-data", ".tmp")
  17
+        _       (write-file file Test test-records {"m1" "test1" "m2" "test2"})
  18
+        content (read-file file)
  19
+        meta    (read-meta file)]
  20
+    (is (= content test-records))
  21
+    (is (= (meta "m1") "test1"))
  22
+    (is (= (meta "m2") "test2"))))
  23
+

0 notes on commit 08759da

Please sign in to comment.
Something went wrong with that request. Please try again.