diff --git a/pigpen-avro/build.gradle b/pigpen-avro/build.gradle new file mode 100644 index 0000000..52284bf --- /dev/null +++ b/pigpen-avro/build.gradle @@ -0,0 +1,15 @@ +dependencies { + compile project(':pigpen') + + provided 'org.clojure:clojure:1.6.0' + + compile 'org.apache.avro:avro:1.7.7' + compile 'com.googlecode.json-simple:json-simple:1.1.1' + compile 'org.xerial.snappy:snappy-java:1.0.4.1' + + + provided 'org.apache.pig:pig:0.11.1' + provided 'org.apache.hadoop:hadoop-core:1.1.2' + provided 'org.antlr:antlr:3.5.2' + provided 'log4j:log4j:1.2.17' +} diff --git a/pigpen-avro/resources/example_data.avro b/pigpen-avro/resources/example_data.avro new file mode 100644 index 0000000..16fca54 Binary files /dev/null and b/pigpen-avro/resources/example_data.avro differ diff --git a/pigpen-avro/resources/example_data.json b/pigpen-avro/resources/example_data.json new file mode 100644 index 0000000..082ca65 --- /dev/null +++ b/pigpen-avro/resources/example_data.json @@ -0,0 +1,67 @@ +{ + "browserTimestamp": 1417997369042, + "rawHash": { + "uId": "14125918269", + "bNo": "15959", + "rallyRequestId": "qs-app-04144pliuzkfbmu1aiwzrecc81s5.qs-app-049893092", + "queryParams": "", + "sId": "12850" + }, + "metadata": { + "schema_id": "e48b9786fd4598fa27c1de354f735" + }, + "requestSpan": { + "metadata": { + "schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa" + }, + "unhandledException": {"null": null}, + "handledExceptions": {"null": null}, + "spanId": "2873f846-c867-4e08-8aa8-530f63bbee2b", + "parentSpanId": {"string": "0baef555-47ef-48a3-b218-6bfd33094ecd"} + }, + "panel": null +} +{ + "browserTimestamp": 1417997368078, + "rawHash": { + "uId": "14125918269", + "bNo": "15959", + "queryParams": "", + "sId": "12850" + }, + "metadata": { + "schema_id": "e48b9786fd4598fa27c1de354f735" + }, + "requestSpan": { + "metadata": { + "schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa" + }, + "unhandledException": {"null": null}, + "handledExceptions": {"null": null}, + "spanId": "3e313af4-76b8-48cf-abc8-92033f225126", + "parentSpanId": {"string": "7673ed70-cfd0-4fb3-9722-33ae8a977301"} + }, + "panel": {"null": null } +} +{ + "browserTimestamp": 1417997392079, + "rawHash": { + "uId": "14125918269", + "bNo": "15959", + "queryParams": "", + "sId": "12850" + }, + "metadata": { + "schema_id": "e48b9786fd4598fa27c1de354f735" + }, + "requestSpan": { + "metadata": { + "schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa" + }, + "unhandledException": {"null": null}, + "handledExceptions": {"null": null}, + "spanId": "3e313af4-76b8-48cf-abc8-92033f225126", + "parentSpanId": null + }, + "panel": {"com.example.PanelInfo":{"defOid": {"long": 29991883477}}} +} diff --git a/pigpen-avro/resources/example_schema.avsc b/pigpen-avro/resources/example_schema.avsc new file mode 100644 index 0000000..9996556 --- /dev/null +++ b/pigpen-avro/resources/example_schema.avsc @@ -0,0 +1,114 @@ +{ + "type": "record", + "name": "ExampleRecord", + "namespace": "com.example", + "fields": [ + { + "name": "browserTimestamp", + "type": "long" + }, + { + "name": "rawHash", + "type": { + "type": "map", + "values": "string" + } + }, + { + "name": "metadata", + "type": { + "type": "record", + "name": "SharedRecordSchema", + "namespace": "com.example", + "fields": [ + { + "name": "schema_id", + "type": "string" + } + ] + } + }, + { + "name": "requestSpan", + "type": { + "type": "record", + "name": "ExampleNestedRecord", + "fields": [ + { + "name": "metadata", + "type": "com.example.SharedRecordSchema" + }, + { + "name": "spanId", + "type": "string" + }, + { + "name": "parentSpanId", + "type": [ + "null", + "string" + ] + }, + { + "name": "unhandledException", + "type": [ + "null", + { + "type": "record", + "name": "Exception", + "fields": [ + { + "name": "exceptionClass", + "type": "string" + }, + { + "name": "exceptionMessage", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "exceptionStack", + "type": "string" + } + ] + } + ] + }, + { + "name": "handledExceptions", + "type": [ + "null", + { + "type": "array", + "items": "Exception" + } + ] + } + ] + } + }, + { + "name": "panel", + "type": [ + "null", + { + "type": "record", + "name": "PanelInfo", + "fields": [ + { + "name": "defOid", + "type": [ + "null", + "long" + ], + "default": null + } + ] + } + ] + } + ] +} diff --git a/pigpen-avro/src/main/clojure/pigpen/avro/core.clj b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj new file mode 100644 index 0000000..8bf4a03 --- /dev/null +++ b/pigpen-avro/src/main/clojure/pigpen/avro/core.clj @@ -0,0 +1,128 @@ +;; +;; +;; Copyright 2013 Netflix, Inc. +;; +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +;; +;; + +(ns pigpen.avro.core + (:require [clojure.string :as str] + [pigpen.raw :as raw] + [pigpen.core :as pig]) + (:import [org.apache.avro + Schema + Schema$Field + Schema$Type + Schema$Parser + SchemaNormalization] + [org.apache.avro.file DataFileReader] + [org.apache.avro.specific SpecificDatumReader] + [org.apache.avro.generic GenericData$Record] + [pigpen.local PigPenLocalLoader])) + +(set! *warn-on-reflection* true) + +(declare field-names) +(defn- prefixed-names [record-schema prefix] + (for [subfield (field-names record-schema)] + (str prefix "." subfield))) + +(defn- field-names [^Schema parsed-schema] + (->> (.getFields parsed-schema) + (map (fn [^Schema$Field field] + (let [type (.getType ^{:tag Schema} (.schema field))] + (cond + (= type Schema$Type/RECORD) (prefixed-names (.schema field) (.name field)) + ;; pig only supports union of null, . + (= type Schema$Type/UNION) (let [record-fields (->> (.getTypes (.schema field)) + (filter #(= (.getType ^{:tag Schema} %) Schema$Type/RECORD)) + (map (fn [rec] (prefixed-names rec (.name field)))))] + (if (empty? record-fields) [(.name field)] + record-fields)) + :else (.name field))))) + flatten + vec)) + +(defn- parse-schema [^String s] + (.parse ^{:tag Schema$Parser} (Schema$Parser.) s)) + +(defmethod pigpen.pig/native->clojure org.apache.avro.util.Utf8 [value] + (str value)) + +(defn dotted-keys->nested-map [kvs] + (->> kvs + (map (fn [[k v]] [(-> k name (clojure.string/split #"\.")) v])) + (reduce (fn [acc [ks v]] + (try + (if v (assoc-in acc (map keyword ks) v) acc) + (catch Exception e + (throw (Exception. + (str "can't assoc-in record: " acc "\nkeys: " + ks "\nvalue: " v "\nkvs: " kvs)))))) {}))) + +(defn load-avro + "*** ALPHA - Subject to change *** + + Loads data from an avro file. Returns data as maps with keyword keys corresponding to avro field names. Fields with avro type \"map\" will be maps with string keys. + + Example: + + ;; avro schemas are defined [on the project's website](http://avro.apache.org/docs/1.7.7/spec.html#schemas) + ;; load-avro takes the schema as a string. + (pig-avro/load-avro \"input.avro\" (slurp \"schemafile.json\")) + + (pig-avro/load-avro \"input.avro\" + \"{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"foo\", + \"fields\": [{\"name\": \"wurdz\", \"type\": \"string\"}, {\"name\": \"bar\", \"type\": \"int\"}] }\") + + Make sure a + [piggybank.jar](http://mvnrepository.com/artifact/org.apache.pig/piggybank/0.14.0) + compatible with your version of hadoop is on $PIG_CLASSPATH. For + hadoop v2, see http://stackoverflow.com/a/21753749. Amazon's elastic + mapreduce comes with a compatible piggybank.jar already on the + classpath. +" + ([location schema] + (let [^Schema parsed-schema (parse-schema schema) + storage (raw/storage$ + ["piggybank.jar"] + "org.apache.pig.piggybank.storage.avro.AvroStorage" + ["schema" (SchemaNormalization/toParsingForm parsed-schema)]) + field-symbols (map symbol (field-names parsed-schema))] + (-> + (raw/load$ location field-symbols storage {:implicit-schema true}) + (raw/bind$ + '[pigpen.avro.core] + '(pigpen.pig/map->bind (comp + pigpen.avro.core/dotted-keys->nested-map + (pigpen.pig/args->map pigpen.pig/native->clojure))) + {:args (clojure.core/mapcat (juxt str identity) field-symbols) + :field-type-in :native }))))) + +(defn- attrs-lookup [^GenericData$Record record attr-names] + (cond + (nil? record) nil + (empty? attr-names) record + :else (attrs-lookup (.get record ^{:tag java.lang.String} (first attr-names)) (rest attr-names)))) + +(defmethod pigpen.local/load "org.apache.pig.piggybank.storage.avro.AvroStorage" + [{:keys [location fields] }] + (reify PigPenLocalLoader + (locations [_] (pigpen.local/load-list location)) + (init-reader [_ filename] + (-> ^java.lang.String filename (java.io.File.) (DataFileReader. (SpecificDatumReader.)))) + (read [_ reader] + (for [datum ^SpecificDatumReader reader] + (zipmap fields (map #(attrs-lookup datum (str/split (name %) #"\.")) fields)))) + (close-reader [_ reader] (.close ^{:tag java.io.Closeable} reader)))) diff --git a/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj b/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj new file mode 100644 index 0000000..235d7b5 --- /dev/null +++ b/pigpen-avro/src/test/clojure/pigpen/functional/avro_test.clj @@ -0,0 +1,82 @@ +;; +;; +;; Copyright 2013 Netflix, Inc. +;; +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +;; +;; + +(ns pigpen.functional.avro-test + (:require [clojure.test :refer :all] + [pigpen.extensions.test :refer [test-diff pigsym-zero pigsym-inc regex->string]] + [pigpen.core :as pig] + [pigpen.fold :as fold] + [pigpen.avro.core :as pig-avro])) + +(.mkdirs (java.io.File. "build/functional/avro-test/test-avro")) + +(def clj-data + [{:browserTimestamp 1417997369042, + :requestSpan + {:spanId "2873f846-c867-4e08-8aa8-530f63bbee2b", + :parentSpanId "0baef555-47ef-48a3-b218-6bfd33094ecd", + :metadata {:schema_id "7d8d87ef3315c672a28b6ae2d31bd9fa"}}, + :metadata {:schema_id "e48b9786fd4598fa27c1de354f735"}, + :rawHash + {"queryParams" "", + "rallyRequestId" + "qs-app-04144pliuzkfbmu1aiwzrecc81s5.qs-app-049893092", + "uId" "14125918269", + "bNo" "15959", + "sId" "12850"}} + {:browserTimestamp 1417997368078, + :requestSpan + {:spanId "3e313af4-76b8-48cf-abc8-92033f225126", + :parentSpanId "7673ed70-cfd0-4fb3-9722-33ae8a977301", + :metadata {:schema_id "7d8d87ef3315c672a28b6ae2d31bd9fa"}}, + :metadata {:schema_id "e48b9786fd4598fa27c1de354f735"}, + :rawHash + {"queryParams" "", + "uId" "14125918269", + "bNo" "15959", + "sId" "12850"}} + {:browserTimestamp 1417997392079, + :requestSpan + {:spanId "3e313af4-76b8-48cf-abc8-92033f225126", + :metadata {:schema_id "7d8d87ef3315c672a28b6ae2d31bd9fa"}}, + :metadata {:schema_id "e48b9786fd4598fa27c1de354f735"}, + :rawHash + {"queryParams" "", + "uId" "14125918269", + "bNo" "15959", + "sId" "12850"} + :panel {:defOid 29991883477}}]) + + +;; Example data was generated like, +;; $ java -jar ~/avro-tools-1.7.7.jar fromjson --codec snappy \ +;; --schema-file example_schema.avsc example_data.json +(deftest test-avro + (let [query (pig-avro/load-avro + "resources/example_data.avro" (slurp "resources/example_schema.avsc"))] + (is (= (pig/dump query) clj-data)))) + +(deftest test-fold + (let [query (->> + (pig-avro/load-avro + "resources/example_data.avro" (slurp "resources/example_schema.avsc")) + (pig/map #(get % :browserTimestamp 0)) + (pig/fold (fold/sum)))] + (is (= (pig/dump query) [4253992129199])))) + +(comment (run-tests)) diff --git a/settings.gradle b/settings.gradle index 238031f..2988181 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,3 +1,3 @@ rootProject.name='pigpen-multi' -include 'pigpen-core','pigpen-parquet' +include 'pigpen-core','pigpen-parquet','pigpen-avro' project(':pigpen-core').name = 'pigpen'