Skip to content

Custom Loaders

Matt Bossenbroek edited this page Dec 10, 2015 · 5 revisions

If you have an existing storage format you want to use, here's how to use it with PigPen.

For writing data, see: Custom Storage

Basically, we wrap the custom storage as a new PigPen command. The excessive comments below make the example look bloated, but it's actually very little code.

(ns pigpen.custom-loader
  (:require [pigpen.core.op :as op]))

(defn load-foo
  ([location] (load-foo location {}))
  ([location opts] ;; you can add any other params here, like the args or field list
    (->>
      (op/load$               ;; this is a raw pig load command
        location              ;; the location of the data - this should be a string
        :my-custom-storage    ;; this indicates the type of storage to use
        '[name address phone] ;; these are the fields this loader returns
        opts)                 ;; just pass the opts through
      (op/bind$ ;; all of the pigpen commands work with a single binary field
                ;; called 'value. The bind command changes your three fields
                ;; into a single serialized value that works with the rest of
                ;; pigpen
        '[pigpen.core.op]     ;; require this namespace at runtime
        '(pigpen.core.op/map->bind vector) ;; vector is the function we use to combine
                                           ;; the three fields - you can use any
                                           ;; function that takes the raw input fields
                                           ;; and returns a single value.
                                           ;; See http://netflix.github.io/PigPen/pigpen.core.op.html#var-bind
        {:args '[name address phone]  ;; tell it what input fields to use
         :field-type-in :native}))))  ;; tell it that the input isn't serialized

The above code does two things: it calls our custom loader, and then transforms the data into a PigPen/Clojure friendly format.

The next step is to provide a local implementation for our custom loader. This step is only required if you want to use the new loader locally (in the REPL). We do this by implementing the following multimethod:

(require '[pigpen.local :refer [PigPenLocalLoader]])

(defmethod pigpen.local/load :my-custom-storage
  [{:keys [location fields]}]
  (reify PigPenLocalLoader
    (locations [_]
      ;; return a list of files or partitions to load
      )
    (init-reader [_ file]
      ;; for a single file, initialize a reader
      )
    (read [_ reader]
      ;; return a sequence of records from the reader. Each should be a map with
      ;; keys from `fields`
      )
    (close-reader [_ reader]
      ;; close the reader created by `init-reader`
      )))

After that, we can provide a Pig script implementation of our loader:

(require '[pigpen.pig.script])
(require '[clojure.string])

(defmethod pigpen.pig.script/storage->script [:load :my-custom-storage]
  [{:keys [fields]}]
  (let [field-expr (->> fields
                     (map name)
                     (clojure.string/join ", "))]
    ;; return the Pig loader to use in the script
    (str "my.custom.Storage('arg') AS (" field-expr ")")))

If you need pig to load any additional jars, you can specify them with this multimethod:

(require '[pigpen.pig.oven])

(defmethod pigpen.pig.oven/storage->references :my-custom-storage
  [_]
  ["storage.jar"])

And we're done! Now we can use our new command just like any other:

(require '[pigpen.core :as pig])

(defn use-my-storage []
  (->>
    (load-foo "input.foo")
    (pig/map count)
    (pig/dump)))