From 63cce29dbba7bd1be09db4998bfcffe26d041a81 Mon Sep 17 00:00:00 2001 From: Johnny Brown Date: Fri, 9 Jan 2015 21:18:21 -0700 Subject: [PATCH 1/6] WIP basic load works on local and cluster --- pigpen-avro/build.gradle | 12 +++ .../src/main/clojure/pigpen/avro/core.clj | 98 +++++++++++++++++++ settings.gradle | 2 +- 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 pigpen-avro/build.gradle create mode 100644 pigpen-avro/src/main/clojure/pigpen/avro/core.clj diff --git a/pigpen-avro/build.gradle b/pigpen-avro/build.gradle new file mode 100644 index 0000000..bd038c4 --- /dev/null +++ b/pigpen-avro/build.gradle @@ -0,0 +1,12 @@ +dependencies { + compile project(':pigpen') + + provided 'org.clojure:clojure:1.6.0' + + compile 'org.apache.avro:avro:1.7.7' + + provided 'org.apache.pig:pig:0.11.1' + provided 'org.apache.hadoop:hadoop-core:1.1.2' + provided 'org.antlr:antlr:3.5.2' + provided 'log4j:log4j:1.2.17' +} diff --git a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj new file mode 100644 index 0000000..bc639f8 --- /dev/null +++ b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj @@ -0,0 +1,98 @@ +(ns pigpen.avro.core + (:require [clojure.string :as str] + [pigpen.raw :as raw] + [pigpen.core :as pig]) + (:import [org.apache.avro + Schema$Type + Schema$Parser + SchemaNormalization] + [org.apache.avro.file DataFileReader] + [org.apache.avro.specific SpecificDatumReader] + [pigpen.local PigPenLocalLoader])) + +(declare field-names) +(defn prefixed-names [record-schema prefix] + (for [subfield (field-names record-schema)] + (str prefix "." subfield))) + +(defn field-names [parsed-schema] + (->> (.getFields parsed-schema) + (map (fn [field] + (let [type (.getType (.schema field))] + (cond + (= type Schema$Type/RECORD) (prefixed-names (.schema field) (.name field)) + ;; pig only supports union of null, . + (= type Schema$Type/UNION) (let [record-fields (->> (.getTypes (.schema field)) + (filter #(= (.getType %) Schema$Type/RECORD)) + (map (fn [rec] (prefixed-names rec (.name field)))))] + (if (empty? record-fields) [(.name field)] + record-fields)) + :else (.name field))))) + flatten + vec)) + +(defn parse-schema [s] (.parse (Schema$Parser.) s)) + +(defmethod pigpen.pig/native->clojure org.apache.avro.util.Utf8 [value] + (str value)) + +(defn dotted-keys->nested-map [kvs] + (->> kvs + (map (fn [[k v]] [(-> k name (clojure.string/split #"\.")) v])) + (reduce (fn [acc [ks v]] + (try + (if v (assoc-in acc (map keyword ks) v) acc) + (catch Exception e (throw (Exception. (str "can't assoc-in record: " acc "\nkeys: " ks "\nvalue: " v "\nkvs: " kvs)))))) {}))) + +;; AWS EMR usage: +;; ============================================================================= +;; run on master node of cluster: +;; ----------------------------------------------------------------------------- +;; curl https://json-simple.googlecode.com/files/json-simple-1.1.1.jar > $PIG_CLASSPATH/json-simple-1.1.1.jar +;; cp /home/hadoop/.versions/2.4.0/share/hadoop/common/lib/avro-1.7.4.jar $PIG_CLASSPATH/ +;; cp /home/hadoop/.versions/2.4.0/share/hadoop/common/lib/snappy-java-1.0.4.1.jar $PIG_CLASSPATH/ + +;; run locally in load-avro directory +;; ----------------------------------------------------------------------------- +;; scp -i ~/schwartz-ci.pem ./target/load-avro-0.1.0-SNAPSHOT-standalone.jar hadoop@ec2-54-187-84-180.us-west-2.compute.amazonaws.com:$(ssh -i ~/schwartz-ci.pem hadoop@ec2-54-187-84-180.us-west-2.compute.amazonaws.com 'echo $PIG_CLASSPATH')/pigpen.jar +(defn load-avro + ([location] (load-avro location {})) + ([location opts] ;; you can add any other params here, like the args or field list + (let [parsed-schema (parse-schema (:schema opts)) + storage (raw/storage$ ;; this creates a new storage definition + ;; add these jars to $PIG_CLASSPATH (most likely /home/hadoop/pig/lib) + ["json-simple-1.1.1.jar" + "piggybank.jar" + "avro-1.7.4.jar" + "snappy-java-1.0.4.1.jar" + ] + "org.apache.pig.piggybank.storage.avro.AvroStorage" ;; your loader class + ["schema" (SchemaNormalization/toParsingForm parsed-schema)]) + field-symbols (map symbol (field-names parsed-schema))] + (-> + (raw/load$ ;; this is a raw pig load command + location ;; the location of the data - this should be a string + field-symbols + storage ;; this is the storage we created earlier + opts) ;; just pass the opts through + (raw/bind$ + '[pigpen.avro.core] + '(pigpen.pig/map->bind (comp + pigpen.avro.core/dotted-keys->nested-map + (pigpen.pig/args->map pigpen.pig/native->clojure))) + {:args (clojure.core/mapcat (juxt str identity) field-symbols) ;; tell it what input fields to use + :field-type-in :native }))))) + +(defn attrs-lookup [record attr-names] + (cond + (nil? record) nil + (empty? attr-names) record + :else (attrs-lookup (.get record (first attr-names)) (rest attr-names)))) + +(defmethod pigpen.local/load "org.apache.pig.piggybank.storage.avro.AvroStorage" [{:keys [location fields] }] + (reify PigPenLocalLoader + (locations [_] (pigpen.local/load-list location)) + (init-reader [_ filename] + (-> filename (java.io.File.) (DataFileReader. (SpecificDatumReader.)))) + (read [_ reader] (for [datum reader] (zipmap fields (map #(attrs-lookup datum (str/split (name %) #"\.")) fields)))) + (close-reader [_ reader] (.close reader)))) diff --git a/settings.gradle b/settings.gradle index 238031f..2988181 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,3 +1,3 @@ rootProject.name='pigpen-multi' -include 'pigpen-core','pigpen-parquet' +include 'pigpen-core','pigpen-parquet','pigpen-avro' project(':pigpen-core').name = 'pigpen' From c3e119f60c9b787855f95b8d46b15de21d71460b Mon Sep 17 00:00:00 2001 From: Johnny Brown Date: Sun, 11 Jan 2015 14:54:46 -0700 Subject: [PATCH 2/6] initial test of load-avro --- pigpen-avro/resources/example_data.avro | Bin 0 -> 1456 bytes pigpen-avro/resources/example_data.json | 67 ++++++++++ pigpen-avro/resources/example_schema.avsc | 114 ++++++++++++++++++ .../clojure/pigpen/functional/avro_test.clj | 82 +++++++++++++ 4 files changed, 263 insertions(+) create mode 100644 pigpen-avro/resources/example_data.avro create mode 100644 pigpen-avro/resources/example_data.json create mode 100644 pigpen-avro/resources/example_schema.avsc create mode 100644 pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj diff --git a/pigpen-avro/resources/example_data.avro b/pigpen-avro/resources/example_data.avro new file mode 100644 index 0000000000000000000000000000000000000000..16fca54b835fe286434fce243867fb14c59283bf GIT binary patch literal 1456 zcmaJ>ziSj(9G?VPBph6$DGsaQ4lMG*%zN|WO_~!VkS$I_5-gNezBBLp+|l`!nOT!f z%)cOLEn;D%V560Vm87w;QS2-%t~n$ih;Sgj+0D!jyFs>^neXT4`@X&G-Ub@zM|(Cq`Amq}o`WB?|GLFA1fDADbg}!oa zzsCA`x>I1j6~7L+pQf;O}CnJ}wxodhdh$7|SI9abnIY|1DhzP~%Zh%=CrjvLz z>=!X7klALjCcGFyQ8cC1CR!ev$$zjY;3hV+wSyYdThawe7(}JMQbjt6W3zayg+b;uzuv`*0#3nt~Zp{=0Beb42N6qfCA z3DgZtfy}m~%j^rT;DWm-88hv`aVb#lJAhO$(-)qEHOgHkx#f_6J1()%;DiHkVlzWJ z%=ZzgKfh-9fYP>YVks_SabU!!CMS+B1ylAc41}g>5C5x5INcdsUr>LL+dmGqD~Hd%_)U1huV4ddVc=t+t;r?&3#D!Wz$FW6GG$W2mk;8 literal 0 HcmV?d00001 diff --git a/pigpen-avro/resources/example_data.json b/pigpen-avro/resources/example_data.json new file mode 100644 index 0000000..082ca65 --- /dev/null +++ b/pigpen-avro/resources/example_data.json @@ -0,0 +1,67 @@ +{ + "browserTimestamp": 1417997369042, + "rawHash": { + "uId": "14125918269", + "bNo": "15959", + "rallyRequestId": "qs-app-04144pliuzkfbmu1aiwzrecc81s5.qs-app-049893092", + "queryParams": "", + "sId": "12850" + }, + "metadata": { + "schema_id": "e48b9786fd4598fa27c1de354f735" + }, + "requestSpan": { + "metadata": { + "schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa" + }, + "unhandledException": {"null": null}, + "handledExceptions": {"null": null}, + "spanId": "2873f846-c867-4e08-8aa8-530f63bbee2b", + "parentSpanId": {"string": "0baef555-47ef-48a3-b218-6bfd33094ecd"} + }, + "panel": null +} +{ + "browserTimestamp": 1417997368078, + "rawHash": { + "uId": "14125918269", + "bNo": "15959", + "queryParams": "", + "sId": "12850" + }, + "metadata": { + "schema_id": "e48b9786fd4598fa27c1de354f735" + }, + "requestSpan": { + "metadata": { + "schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa" + }, + "unhandledException": {"null": null}, + "handledExceptions": {"null": null}, + "spanId": "3e313af4-76b8-48cf-abc8-92033f225126", + "parentSpanId": {"string": "7673ed70-cfd0-4fb3-9722-33ae8a977301"} + }, + "panel": {"null": null } +} +{ + "browserTimestamp": 1417997392079, + "rawHash": { + "uId": "14125918269", + "bNo": "15959", + "queryParams": "", + "sId": "12850" + }, + "metadata": { + "schema_id": "e48b9786fd4598fa27c1de354f735" + }, + "requestSpan": { + "metadata": { + "schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa" + }, + "unhandledException": {"null": null}, + "handledExceptions": {"null": null}, + "spanId": "3e313af4-76b8-48cf-abc8-92033f225126", + "parentSpanId": null + }, + "panel": {"com.example.PanelInfo":{"defOid": {"long": 29991883477}}} +} diff --git a/pigpen-avro/resources/example_schema.avsc b/pigpen-avro/resources/example_schema.avsc new file mode 100644 index 0000000..9996556 --- /dev/null +++ b/pigpen-avro/resources/example_schema.avsc @@ -0,0 +1,114 @@ +{ + "type": "record", + "name": "ExampleRecord", + "namespace": "com.example", + "fields": [ + { + "name": "browserTimestamp", + "type": "long" + }, + { + "name": "rawHash", + "type": { + "type": "map", + "values": "string" + } + }, + { + "name": "metadata", + "type": { + "type": "record", + "name": "SharedRecordSchema", + "namespace": "com.example", + "fields": [ + { + "name": "schema_id", + "type": "string" + } + ] + } + }, + { + "name": "requestSpan", + "type": { + "type": "record", + "name": "ExampleNestedRecord", + "fields": [ + { + "name": "metadata", + "type": "com.example.SharedRecordSchema" + }, + { + "name": "spanId", + "type": "string" + }, + { + "name": "parentSpanId", + "type": [ + "null", + "string" + ] + }, + { + "name": "unhandledException", + "type": [ + "null", + { + "type": "record", + "name": "Exception", + "fields": [ + { + "name": "exceptionClass", + "type": "string" + }, + { + "name": "exceptionMessage", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "exceptionStack", + "type": "string" + } + ] + } + ] + }, + { + "name": "handledExceptions", + "type": [ + "null", + { + "type": "array", + "items": "Exception" + } + ] + } + ] + } + }, + { + "name": "panel", + "type": [ + "null", + { + "type": "record", + "name": "PanelInfo", + "fields": [ + { + "name": "defOid", + "type": [ + "null", + "long" + ], + "default": null + } + ] + } + ] + } + ] +} diff --git a/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj b/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj new file mode 100644 index 0000000..c9eba71 --- /dev/null +++ b/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj @@ -0,0 +1,82 @@ +;; +;; +;; Copyright 2013 Netflix, Inc. +;; +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +;; +;; + +(ns pigpen.functional.avro-test + (:require [clojure.test :refer :all] + [pigpen.extensions.test :refer [test-diff pigsym-zero pigsym-inc regex->string]] + [pigpen.core :as pig] + [pigpen.fold :as fold] + [pigpen.avro.core :as pig-avro])) + +(.mkdirs (java.io.File. "build/functional/avro-test/test-avro")) + +(def clj-data + [{:browserTimestamp 1417997369042, + :requestSpan + {:spanId "2873f846-c867-4e08-8aa8-530f63bbee2b", + :parentSpanId "0baef555-47ef-48a3-b218-6bfd33094ecd", + :metadata {:schema_id "7d8d87ef3315c672a28b6ae2d31bd9fa"}}, + :metadata {:schema_id "e48b9786fd4598fa27c1de354f735"}, + :rawHash + {"queryParams" "", + "rallyRequestId" + "qs-app-04144pliuzkfbmu1aiwzrecc81s5.qs-app-049893092", + "uId" "14125918269", + "bNo" "15959", + "sId" "12850"}} + {:browserTimestamp 1417997368078, + :requestSpan + {:spanId "3e313af4-76b8-48cf-abc8-92033f225126", + :parentSpanId "7673ed70-cfd0-4fb3-9722-33ae8a977301", + :metadata {:schema_id "7d8d87ef3315c672a28b6ae2d31bd9fa"}}, + :metadata {:schema_id "e48b9786fd4598fa27c1de354f735"}, + :rawHash + {"queryParams" "", + "uId" "14125918269", + "bNo" "15959", + "sId" "12850"}} + {:browserTimestamp 1417997392079, + :requestSpan + {:spanId "3e313af4-76b8-48cf-abc8-92033f225126", + :metadata {:schema_id "7d8d87ef3315c672a28b6ae2d31bd9fa"}}, + :metadata {:schema_id "e48b9786fd4598fa27c1de354f735"}, + :rawHash + {"queryParams" "", + "uId" "14125918269", + "bNo" "15959", + "sId" "12850"} + :panel {:defOid 29991883477}}]) + +(deftest test-avro + (let [ + query (pig-avro/load-avro "resources/example_data.avro" + {:schema (slurp "resources/example_schema.avsc") + :implicit-schema true})] + (is (= (pig/dump query) clj-data)))) + +(deftest test-fold + (let [ + query (->> + (pig-avro/load-avro "resources/example_data.avro" + {:schema (slurp "resources/example_schema.avsc") + :implicit-schema true}) + (pig/map #(get % :browserTimestamp 0)) + (pig/fold (fold/sum)))] + (is (= (pig/dump query) [4253992129199])))) + +(comment (run-tests)) From 792c4cd774f20ccef1e621ab014f06a96c16e392 Mon Sep 17 00:00:00 2001 From: Johnny Brown Date: Sun, 11 Jan 2015 17:54:40 -0700 Subject: [PATCH 3/6] Add documentation, make schema non-optional arg to load-avro --- .../src/main/clojure/pigpen/avro/core.clj | 60 ++++++++++++------- .../clojure/pigpen/functional/avro_test.clj | 14 ++--- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj index bc639f8..900b853 100644 --- a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj +++ b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj @@ -1,3 +1,21 @@ +;; +;; +;; Copyright 2013 Netflix, Inc. +;; +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +;; +;; + (ns pigpen.avro.core (:require [clojure.string :as str] [pigpen.raw :as raw] @@ -44,21 +62,24 @@ (if v (assoc-in acc (map keyword ks) v) acc) (catch Exception e (throw (Exception. (str "can't assoc-in record: " acc "\nkeys: " ks "\nvalue: " v "\nkvs: " kvs)))))) {}))) -;; AWS EMR usage: -;; ============================================================================= -;; run on master node of cluster: -;; ----------------------------------------------------------------------------- -;; curl https://json-simple.googlecode.com/files/json-simple-1.1.1.jar > $PIG_CLASSPATH/json-simple-1.1.1.jar -;; cp /home/hadoop/.versions/2.4.0/share/hadoop/common/lib/avro-1.7.4.jar $PIG_CLASSPATH/ -;; cp /home/hadoop/.versions/2.4.0/share/hadoop/common/lib/snappy-java-1.0.4.1.jar $PIG_CLASSPATH/ -;; run locally in load-avro directory -;; ----------------------------------------------------------------------------- -;; scp -i ~/schwartz-ci.pem ./target/load-avro-0.1.0-SNAPSHOT-standalone.jar hadoop@ec2-54-187-84-180.us-west-2.compute.amazonaws.com:$(ssh -i ~/schwartz-ci.pem hadoop@ec2-54-187-84-180.us-west-2.compute.amazonaws.com 'echo $PIG_CLASSPATH')/pigpen.jar (defn load-avro - ([location] (load-avro location {})) - ([location opts] ;; you can add any other params here, like the args or field list - (let [parsed-schema (parse-schema (:schema opts)) + "*** ALPHA - Subject to change *** + + Loads data from an avro file. Returns data as maps with keyword keys corresponding to avro field names. Fields with avro type \"map\" will be maps with string keys. + + Example: + + ;; avro schemas are defined [on the project's website](http://avro.apache.org/docs/1.7.7/spec.html#schemas) + (pig-avro/load-avro \"input.avro\" (slurp \"schemafile.json\")) + + (pig-avro/load-avro \"input.avro\" + {\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"foo\", + \"fields\": [{\"name\": \"wurdz\", \"type\": \"string\"}, {\"name\": \"bar\", \"type\": \"int\"}] }) + +" + ([location schema] + (let [parsed-schema (parse-schema schema) storage (raw/storage$ ;; this creates a new storage definition ;; add these jars to $PIG_CLASSPATH (most likely /home/hadoop/pig/lib) ["json-simple-1.1.1.jar" @@ -70,11 +91,7 @@ ["schema" (SchemaNormalization/toParsingForm parsed-schema)]) field-symbols (map symbol (field-names parsed-schema))] (-> - (raw/load$ ;; this is a raw pig load command - location ;; the location of the data - this should be a string - field-symbols - storage ;; this is the storage we created earlier - opts) ;; just pass the opts through + (raw/load$ location field-symbols storage {:implicit-schema true}) (raw/bind$ '[pigpen.avro.core] '(pigpen.pig/map->bind (comp @@ -89,10 +106,13 @@ (empty? attr-names) record :else (attrs-lookup (.get record (first attr-names)) (rest attr-names)))) -(defmethod pigpen.local/load "org.apache.pig.piggybank.storage.avro.AvroStorage" [{:keys [location fields] }] +(defmethod pigpen.local/load "org.apache.pig.piggybank.storage.avro.AvroStorage" + [{:keys [location fields] }] (reify PigPenLocalLoader (locations [_] (pigpen.local/load-list location)) (init-reader [_ filename] (-> filename (java.io.File.) (DataFileReader. (SpecificDatumReader.)))) - (read [_ reader] (for [datum reader] (zipmap fields (map #(attrs-lookup datum (str/split (name %) #"\.")) fields)))) + (read [_ reader] + (for [datum reader] + (zipmap fields (map #(attrs-lookup datum (str/split (name %) #"\.")) fields)))) (close-reader [_ reader] (.close reader)))) diff --git a/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj b/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj index c9eba71..365dde1 100644 --- a/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj +++ b/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj @@ -63,18 +63,14 @@ :panel {:defOid 29991883477}}]) (deftest test-avro - (let [ - query (pig-avro/load-avro "resources/example_data.avro" - {:schema (slurp "resources/example_schema.avsc") - :implicit-schema true})] + (let [query (pig-avro/load-avro + "resources/example_data.avro" (slurp "resources/example_schema.avsc"))] (is (= (pig/dump query) clj-data)))) (deftest test-fold - (let [ - query (->> - (pig-avro/load-avro "resources/example_data.avro" - {:schema (slurp "resources/example_schema.avsc") - :implicit-schema true}) + (let [query (->> + (pig-avro/load-avro + "resources/example_data.avro" (slurp "resources/example_schema.avsc")) (pig/map #(get % :browserTimestamp 0)) (pig/fold (fold/sum)))] (is (= (pig/dump query) [4253992129199])))) From 96033876f2636b2d332422484293e78cc2aad7dd Mon Sep 17 00:00:00 2001 From: Johnny Brown Date: Sun, 11 Jan 2015 18:36:24 -0700 Subject: [PATCH 4/6] use dependency system for needed jars --- pigpen-avro/build.gradle | 3 +++ .../src/main/clojure/pigpen/avro/core.clj | 20 +++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/pigpen-avro/build.gradle b/pigpen-avro/build.gradle index bd038c4..52284bf 100644 --- a/pigpen-avro/build.gradle +++ b/pigpen-avro/build.gradle @@ -4,6 +4,9 @@ dependencies { provided 'org.clojure:clojure:1.6.0' compile 'org.apache.avro:avro:1.7.7' + compile 'com.googlecode.json-simple:json-simple:1.1.1' + compile 'org.xerial.snappy:snappy-java:1.0.4.1' + provided 'org.apache.pig:pig:0.11.1' provided 'org.apache.hadoop:hadoop-core:1.1.2' diff --git a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj index 900b853..999cb04 100644 --- a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj +++ b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj @@ -28,6 +28,8 @@ [org.apache.avro.specific SpecificDatumReader] [pigpen.local PigPenLocalLoader])) +(set! *warn-on-reflection* true) + (declare field-names) (defn prefixed-names [record-schema prefix] (for [subfield (field-names record-schema)] @@ -71,23 +73,19 @@ Example: ;; avro schemas are defined [on the project's website](http://avro.apache.org/docs/1.7.7/spec.html#schemas) + ;; load-avro takes the schema as a string. (pig-avro/load-avro \"input.avro\" (slurp \"schemafile.json\")) (pig-avro/load-avro \"input.avro\" - {\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"foo\", - \"fields\": [{\"name\": \"wurdz\", \"type\": \"string\"}, {\"name\": \"bar\", \"type\": \"int\"}] }) + \"{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"foo\", + \"fields\": [{\"name\": \"wurdz\", \"type\": \"string\"}, {\"name\": \"bar\", \"type\": \"int\"}] }\") " ([location schema] (let [parsed-schema (parse-schema schema) - storage (raw/storage$ ;; this creates a new storage definition - ;; add these jars to $PIG_CLASSPATH (most likely /home/hadoop/pig/lib) - ["json-simple-1.1.1.jar" - "piggybank.jar" - "avro-1.7.4.jar" - "snappy-java-1.0.4.1.jar" - ] - "org.apache.pig.piggybank.storage.avro.AvroStorage" ;; your loader class + storage (raw/storage$ + ["piggybank.jar"] + "org.apache.pig.piggybank.storage.avro.AvroStorage" ["schema" (SchemaNormalization/toParsingForm parsed-schema)]) field-symbols (map symbol (field-names parsed-schema))] (-> @@ -97,7 +95,7 @@ '(pigpen.pig/map->bind (comp pigpen.avro.core/dotted-keys->nested-map (pigpen.pig/args->map pigpen.pig/native->clojure))) - {:args (clojure.core/mapcat (juxt str identity) field-symbols) ;; tell it what input fields to use + {:args (clojure.core/mapcat (juxt str identity) field-symbols) :field-type-in :native }))))) (defn attrs-lookup [record attr-names] From 276ba1b1567152a732212de0d17e75b223ff2119 Mon Sep 17 00:00:00 2001 From: Johnny Brown Date: Mon, 12 Jan 2015 14:23:34 -0700 Subject: [PATCH 5/6] add type hints to avoid reflection --- .../src/main/clojure/pigpen/avro/core.clj | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj index 999cb04..f95b171 100644 --- a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj +++ b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj @@ -21,11 +21,14 @@ [pigpen.raw :as raw] [pigpen.core :as pig]) (:import [org.apache.avro + Schema + Schema$Field Schema$Type Schema$Parser SchemaNormalization] [org.apache.avro.file DataFileReader] [org.apache.avro.specific SpecificDatumReader] + [org.apache.avro.generic GenericData$Record] [pigpen.local PigPenLocalLoader])) (set! *warn-on-reflection* true) @@ -35,15 +38,15 @@ (for [subfield (field-names record-schema)] (str prefix "." subfield))) -(defn field-names [parsed-schema] +(defn field-names [^Schema parsed-schema] (->> (.getFields parsed-schema) - (map (fn [field] - (let [type (.getType (.schema field))] + (map (fn [^Schema$Field field] + (let [type (.getType ^{:tag Schema} (.schema field))] (cond (= type Schema$Type/RECORD) (prefixed-names (.schema field) (.name field)) ;; pig only supports union of null, . (= type Schema$Type/UNION) (let [record-fields (->> (.getTypes (.schema field)) - (filter #(= (.getType %) Schema$Type/RECORD)) + (filter #(= (.getType ^{:tag Schema} %) Schema$Type/RECORD)) (map (fn [rec] (prefixed-names rec (.name field)))))] (if (empty? record-fields) [(.name field)] record-fields)) @@ -51,7 +54,8 @@ flatten vec)) -(defn parse-schema [s] (.parse (Schema$Parser.) s)) +(defn parse-schema [^String s] (let [parser ^Schema$Parser (Schema$Parser.)] + (.parse parser s))) (defmethod pigpen.pig/native->clojure org.apache.avro.util.Utf8 [value] (str value)) @@ -82,7 +86,7 @@ " ([location schema] - (let [parsed-schema (parse-schema schema) + (let [^Schema parsed-schema (parse-schema schema) storage (raw/storage$ ["piggybank.jar"] "org.apache.pig.piggybank.storage.avro.AvroStorage" @@ -98,19 +102,19 @@ {:args (clojure.core/mapcat (juxt str identity) field-symbols) :field-type-in :native }))))) -(defn attrs-lookup [record attr-names] +(defn attrs-lookup [^GenericData$Record record attr-names] (cond (nil? record) nil (empty? attr-names) record - :else (attrs-lookup (.get record (first attr-names)) (rest attr-names)))) + :else (attrs-lookup (.get record ^{:tag java.lang.String} (first attr-names)) (rest attr-names)))) (defmethod pigpen.local/load "org.apache.pig.piggybank.storage.avro.AvroStorage" [{:keys [location fields] }] (reify PigPenLocalLoader (locations [_] (pigpen.local/load-list location)) (init-reader [_ filename] - (-> filename (java.io.File.) (DataFileReader. (SpecificDatumReader.)))) + (-> ^java.lang.String filename (java.io.File.) (DataFileReader. (SpecificDatumReader.)))) (read [_ reader] - (for [datum reader] + (for [datum ^SpecificDatumReader reader] (zipmap fields (map #(attrs-lookup datum (str/split (name %) #"\.")) fields)))) - (close-reader [_ reader] (.close reader)))) + (close-reader [_ reader] (.close ^{:tag java.io.Closeable} reader)))) From ddf99d72d91b22634b192523dd3abafd3a17ed47 Mon Sep 17 00:00:00 2001 From: Johnny Brown Date: Tue, 13 Jan 2015 16:00:46 -0700 Subject: [PATCH 6/6] make some things private, add some docs --- .../src/main/clojure/pigpen/avro/core.clj | 26 ++++++++++++------- .../clojure/pigpen/functional/avro_test.clj | 4 +++ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj index f95b171..8bf4a03 100644 --- a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj +++ b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj @@ -34,11 +34,11 @@ (set! *warn-on-reflection* true) (declare field-names) -(defn prefixed-names [record-schema prefix] +(defn- prefixed-names [record-schema prefix] (for [subfield (field-names record-schema)] (str prefix "." subfield))) -(defn field-names [^Schema parsed-schema] +(defn- field-names [^Schema parsed-schema] (->> (.getFields parsed-schema) (map (fn [^Schema$Field field] (let [type (.getType ^{:tag Schema} (.schema field))] @@ -54,8 +54,8 @@ flatten vec)) -(defn parse-schema [^String s] (let [parser ^Schema$Parser (Schema$Parser.)] - (.parse parser s))) +(defn- parse-schema [^String s] + (.parse ^{:tag Schema$Parser} (Schema$Parser.) s)) (defmethod pigpen.pig/native->clojure org.apache.avro.util.Utf8 [value] (str value)) @@ -66,8 +66,10 @@ (reduce (fn [acc [ks v]] (try (if v (assoc-in acc (map keyword ks) v) acc) - (catch Exception e (throw (Exception. (str "can't assoc-in record: " acc "\nkeys: " ks "\nvalue: " v "\nkvs: " kvs)))))) {}))) - + (catch Exception e + (throw (Exception. + (str "can't assoc-in record: " acc "\nkeys: " + ks "\nvalue: " v "\nkvs: " kvs)))))) {}))) (defn load-avro "*** ALPHA - Subject to change *** @@ -82,8 +84,14 @@ (pig-avro/load-avro \"input.avro\" \"{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"foo\", - \"fields\": [{\"name\": \"wurdz\", \"type\": \"string\"}, {\"name\": \"bar\", \"type\": \"int\"}] }\") - + \"fields\": [{\"name\": \"wurdz\", \"type\": \"string\"}, {\"name\": \"bar\", \"type\": \"int\"}] }\") + + Make sure a + [piggybank.jar](http://mvnrepository.com/artifact/org.apache.pig/piggybank/0.14.0) + compatible with your version of hadoop is on $PIG_CLASSPATH. For + hadoop v2, see http://stackoverflow.com/a/21753749. Amazon's elastic + mapreduce comes with a compatible piggybank.jar already on the + classpath. " ([location schema] (let [^Schema parsed-schema (parse-schema schema) @@ -102,7 +110,7 @@ {:args (clojure.core/mapcat (juxt str identity) field-symbols) :field-type-in :native }))))) -(defn attrs-lookup [^GenericData$Record record attr-names] +(defn- attrs-lookup [^GenericData$Record record attr-names] (cond (nil? record) nil (empty? attr-names) record diff --git a/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj b/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj index 365dde1..235d7b5 100644 --- a/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj +++ b/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj @@ -62,6 +62,10 @@ "sId" "12850"} :panel {:defOid 29991883477}}]) + +;; Example data was generated like, +;; $ java -jar ~/avro-tools-1.7.7.jar fromjson --codec snappy \ +;; --schema-file example_schema.avsc example_data.json (deftest test-avro (let [query (pig-avro/load-avro "resources/example_data.avro" (slurp "resources/example_schema.avsc"))]