Skip to content

Commit

Permalink
Merge pull request #101 from theJohnnyBrown/avro
Browse files Browse the repository at this point in the history
Avro Loader
  • Loading branch information
mbossenbroek committed Jan 14, 2015
2 parents 3474b8f + ddf99d7 commit b030ccd
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 1 deletion.
15 changes: 15 additions & 0 deletions pigpen-avro/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
dependencies {
compile project(':pigpen')

provided 'org.clojure:clojure:1.6.0'

compile 'org.apache.avro:avro:1.7.7'
compile 'com.googlecode.json-simple:json-simple:1.1.1'
compile 'org.xerial.snappy:snappy-java:1.0.4.1'


provided 'org.apache.pig:pig:0.11.1'
provided 'org.apache.hadoop:hadoop-core:1.1.2'
provided 'org.antlr:antlr:3.5.2'
provided 'log4j:log4j:1.2.17'
}
Binary file added pigpen-avro/resources/example_data.avro
Binary file not shown.
67 changes: 67 additions & 0 deletions pigpen-avro/resources/example_data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"browserTimestamp": 1417997369042,
"rawHash": {
"uId": "14125918269",
"bNo": "15959",
"rallyRequestId": "qs-app-04144pliuzkfbmu1aiwzrecc81s5.qs-app-049893092",
"queryParams": "",
"sId": "12850"
},
"metadata": {
"schema_id": "e48b9786fd4598fa27c1de354f735"
},
"requestSpan": {
"metadata": {
"schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa"
},
"unhandledException": {"null": null},
"handledExceptions": {"null": null},
"spanId": "2873f846-c867-4e08-8aa8-530f63bbee2b",
"parentSpanId": {"string": "0baef555-47ef-48a3-b218-6bfd33094ecd"}
},
"panel": null
}
{
"browserTimestamp": 1417997368078,
"rawHash": {
"uId": "14125918269",
"bNo": "15959",
"queryParams": "",
"sId": "12850"
},
"metadata": {
"schema_id": "e48b9786fd4598fa27c1de354f735"
},
"requestSpan": {
"metadata": {
"schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa"
},
"unhandledException": {"null": null},
"handledExceptions": {"null": null},
"spanId": "3e313af4-76b8-48cf-abc8-92033f225126",
"parentSpanId": {"string": "7673ed70-cfd0-4fb3-9722-33ae8a977301"}
},
"panel": {"null": null }
}
{
"browserTimestamp": 1417997392079,
"rawHash": {
"uId": "14125918269",
"bNo": "15959",
"queryParams": "",
"sId": "12850"
},
"metadata": {
"schema_id": "e48b9786fd4598fa27c1de354f735"
},
"requestSpan": {
"metadata": {
"schema_id": "7d8d87ef3315c672a28b6ae2d31bd9fa"
},
"unhandledException": {"null": null},
"handledExceptions": {"null": null},
"spanId": "3e313af4-76b8-48cf-abc8-92033f225126",
"parentSpanId": null
},
"panel": {"com.example.PanelInfo":{"defOid": {"long": 29991883477}}}
}
114 changes: 114 additions & 0 deletions pigpen-avro/resources/example_schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
{
"type": "record",
"name": "ExampleRecord",
"namespace": "com.example",
"fields": [
{
"name": "browserTimestamp",
"type": "long"
},
{
"name": "rawHash",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "metadata",
"type": {
"type": "record",
"name": "SharedRecordSchema",
"namespace": "com.example",
"fields": [
{
"name": "schema_id",
"type": "string"
}
]
}
},
{
"name": "requestSpan",
"type": {
"type": "record",
"name": "ExampleNestedRecord",
"fields": [
{
"name": "metadata",
"type": "com.example.SharedRecordSchema"
},
{
"name": "spanId",
"type": "string"
},
{
"name": "parentSpanId",
"type": [
"null",
"string"
]
},
{
"name": "unhandledException",
"type": [
"null",
{
"type": "record",
"name": "Exception",
"fields": [
{
"name": "exceptionClass",
"type": "string"
},
{
"name": "exceptionMessage",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "exceptionStack",
"type": "string"
}
]
}
]
},
{
"name": "handledExceptions",
"type": [
"null",
{
"type": "array",
"items": "Exception"
}
]
}
]
}
},
{
"name": "panel",
"type": [
"null",
{
"type": "record",
"name": "PanelInfo",
"fields": [
{
"name": "defOid",
"type": [
"null",
"long"
],
"default": null
}
]
}
]
}
]
}
128 changes: 128 additions & 0 deletions pigpen-avro/src/main/clojure/pigpen/avro/core.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
;;
;;
;; Copyright 2013 Netflix, Inc.
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
;;
;;

(ns pigpen.avro.core
(:require [clojure.string :as str]
[pigpen.raw :as raw]
[pigpen.core :as pig])
(:import [org.apache.avro
Schema
Schema$Field
Schema$Type
Schema$Parser
SchemaNormalization]
[org.apache.avro.file DataFileReader]
[org.apache.avro.specific SpecificDatumReader]
[org.apache.avro.generic GenericData$Record]
[pigpen.local PigPenLocalLoader]))

(set! *warn-on-reflection* true)

(declare field-names)
(defn- prefixed-names [record-schema prefix]
(for [subfield (field-names record-schema)]
(str prefix "." subfield)))

(defn- field-names [^Schema parsed-schema]
(->> (.getFields parsed-schema)
(map (fn [^Schema$Field field]
(let [type (.getType ^{:tag Schema} (.schema field))]
(cond
(= type Schema$Type/RECORD) (prefixed-names (.schema field) (.name field))
;; pig only supports union of null, <type>.
(= type Schema$Type/UNION) (let [record-fields (->> (.getTypes (.schema field))
(filter #(= (.getType ^{:tag Schema} %) Schema$Type/RECORD))
(map (fn [rec] (prefixed-names rec (.name field)))))]
(if (empty? record-fields) [(.name field)]
record-fields))
:else (.name field)))))
flatten
vec))

(defn- parse-schema [^String s]
(.parse ^{:tag Schema$Parser} (Schema$Parser.) s))

(defmethod pigpen.pig/native->clojure org.apache.avro.util.Utf8 [value]
(str value))

(defn dotted-keys->nested-map [kvs]
(->> kvs
(map (fn [[k v]] [(-> k name (clojure.string/split #"\.")) v]))
(reduce (fn [acc [ks v]]
(try
(if v (assoc-in acc (map keyword ks) v) acc)
(catch Exception e
(throw (Exception.
(str "can't assoc-in record: " acc "\nkeys: "
ks "\nvalue: " v "\nkvs: " kvs)))))) {})))

(defn load-avro
"*** ALPHA - Subject to change ***
Loads data from an avro file. Returns data as maps with keyword keys corresponding to avro field names. Fields with avro type \"map\" will be maps with string keys.
Example:
;; avro schemas are defined [on the project's website](http://avro.apache.org/docs/1.7.7/spec.html#schemas)
;; load-avro takes the schema as a string.
(pig-avro/load-avro \"input.avro\" (slurp \"schemafile.json\"))
(pig-avro/load-avro \"input.avro\"
\"{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"foo\",
\"fields\": [{\"name\": \"wurdz\", \"type\": \"string\"}, {\"name\": \"bar\", \"type\": \"int\"}] }\")
Make sure a
[piggybank.jar](http://mvnrepository.com/artifact/org.apache.pig/piggybank/0.14.0)
compatible with your version of hadoop is on $PIG_CLASSPATH. For
hadoop v2, see http://stackoverflow.com/a/21753749. Amazon's elastic
mapreduce comes with a compatible piggybank.jar already on the
classpath.
"
([location schema]
(let [^Schema parsed-schema (parse-schema schema)
storage (raw/storage$
["piggybank.jar"]
"org.apache.pig.piggybank.storage.avro.AvroStorage"
["schema" (SchemaNormalization/toParsingForm parsed-schema)])
field-symbols (map symbol (field-names parsed-schema))]
(->
(raw/load$ location field-symbols storage {:implicit-schema true})
(raw/bind$
'[pigpen.avro.core]
'(pigpen.pig/map->bind (comp
pigpen.avro.core/dotted-keys->nested-map
(pigpen.pig/args->map pigpen.pig/native->clojure)))
{:args (clojure.core/mapcat (juxt str identity) field-symbols)
:field-type-in :native })))))

(defn- attrs-lookup [^GenericData$Record record attr-names]
(cond
(nil? record) nil
(empty? attr-names) record
:else (attrs-lookup (.get record ^{:tag java.lang.String} (first attr-names)) (rest attr-names))))

(defmethod pigpen.local/load "org.apache.pig.piggybank.storage.avro.AvroStorage"
[{:keys [location fields] }]
(reify PigPenLocalLoader
(locations [_] (pigpen.local/load-list location))
(init-reader [_ filename]
(-> ^java.lang.String filename (java.io.File.) (DataFileReader. (SpecificDatumReader.))))
(read [_ reader]
(for [datum ^SpecificDatumReader reader]
(zipmap fields (map #(attrs-lookup datum (str/split (name %) #"\.")) fields))))
(close-reader [_ reader] (.close ^{:tag java.io.Closeable} reader))))

0 comments on commit b030ccd

Please sign in to comment.