Permalink
Browse files

Add functions for zipping tables and wiring them on CartoDB.

  • Loading branch information...
1 parent ed1e2ee commit 219a420d4d2bb30f7897011100cbe8c7ee6fe4db @eightysteele eightysteele committed Jun 20, 2012
Showing with 95 additions and 20 deletions.
  1. +3 −1 project.clj
  2. +68 −8 src/clj/gulo/cdb.clj
  3. +20 −5 src/clj/gulo/core.clj
  4. +4 −6 src/clj/gulo/harvest.clj
View
@@ -16,7 +16,9 @@
[dwca-reader-clj "0.3.0-SNAPSHOT"]
[cartodb-clj "1.1.1-SNAPSHOT"]
[org.clojure/data.csv "0.1.2"]
- [clj-http "0.4.3"]]
+ [clj-http "0.4.3"]
+ [net.lingala.zip4j/zip4j "1.3.1"]
+ [com.google.guava/guava "12.0"]]
:dev-dependencies [[org.apache.hadoop/hadoop-core "0.20.2-dev"]
[midje-cascalog "0.4.0"]
[midje "1.4.0"]])
View
@@ -2,13 +2,23 @@
"This namespace contains CartoDB functions for setting up database tables."
(:use [gulo.util]
[clojure.data.json :only (read-json)]
- [clojure.string :only (join)])
+ [clojure.string :only (join)]
+ [clojure.contrib.shell-out :only (sh)])
(:require [clojure.java.io :as io]
- [cartodb.client :as cdb]))
+ [cartodb.client :as cdb])
+ (:import [com.google.common.io Files]
+ [com.google.common.base Charsets]
+ [java.io File FileInputStream]
+ [net.lingala.zip4j.core ZipFile]
+ [net.lingala.zip4j.model ZipParameters]
+ [net.lingala.zip4j.util Zip4jConstants]
+ [net.lingala.zip4j.exception ZipException]))
+;; Slurps resources/creds.json for OAuth: {"key" "secret" "user" "password"}
(def creds (read-json (slurp (io/resource "creds.json"))))
(defn- drop-column
+ "Drop the supplied column from the supplied table or just return the SQL."
[table column & {:keys [cascade execute account]
:or {cascade false execute true account nil}}]
(let [account (if (not account) (:user creds) account)
@@ -17,6 +27,7 @@
(if execute (cdb/query sql account :oauth creds) sql)))
(defn- drop-table
+ "Drop the supplied table or just return the SQL."
[table & {:keys [execute account]
:or {execute true account nil}}]
(let [tables (if (coll? table) (join "," table) table)
@@ -25,6 +36,7 @@
(if execute (cdb/query sql account :oauth creds) sql)))
(defn- create-index
+ "Create index on table column or just return the SQL."
[table column index & {:keys [unique execute account]
:or {unique false execute true account nil}}]
(let [account (if (not account) (:user creds) account)
@@ -44,12 +56,60 @@
account (:user creds)]
(if execute (cdb/query sql account :oauth creds) sql)))
-(defn wire-occ-table
- [& {:keys [delete] :or {delete false}}]
- (if delete (drop-table "occ"))
- (create-occ-table)
- ;;(drop-column "occ" "the_geom" :cascade true)
- ;;(drop-column "occ" "the_geom_webmercator" :cascade true)
+(defn- wire-occ-table
+ [& {:keys [delete drop-geom] :or {delete false drop-geom false}}]
+ (if drop-geom (drop-column "occ" "the_geom" :cascade true))
(create-index "occ" "occ_id" "occ_occ_id_idx" :unique true)
(create-index "occ" "tax_loc_id" "occ_tax_loc_id_idx"))
+(defn- wire-tax-table
+ [& {:keys [delete drop-geom] :or {delete false drop-geom false}}]
+ (if drop-geom (drop-column "tax" "the_geom" :cascade true))
+ (create-index "tax" "tax_id" "tax_tax_id_idx" :unique true)
+ (create-index "tax" "name" "tax_name_idx" :unique true))
+
+(defn- wire-loc-table
+ [& {:keys [delete drop-geom] :or {delete false drop-geom false}}]
+ (create-index "loc" "loc_id" "loc_loc_id_idx" :unique true))
+
+(defn- wire-tax-loc-table
+ [& {:keys [delete drop-geom] :or {delete false drop-geom false}}]
+ (if drop-geom (drop-column "tax_loc" "the_geom" :cascade true))
+ (create-index "tax_loc" "tax_loc_id" "tax_loc_tax_loc_id_idx" :unique true)
+ (create-index "tax_loc" "tax_id" "tax_loc_tax_id_idx")
+ (create-index "tax_loc" "loc_id" "tax_loc_loc_id_idx"))
+
+(defn prepare-zip
+ [table-name table-cols path out-path]
+ (let [file-path (str out-path "/" table-name ".csv")
+ zip-path (str out-path "/" table-name ".zip")]
+ (Files/copy (File. path) (File. file-path))
+ ;; TODO: This sh is brittle business:
+ (sh "sed" "-i" (str "1i " (join \tab table-cols) ) file-path) ;; Add header to file
+ (sh "zip" "-j" "-r" "-D" zip-path file-path)
+ zip-path))
+
+(defn prepare-tables
+ []
+ (let [sink "/mnt/hgfs/Data/vertnet/gulo/tables"
+ occ-source "/mnt/hgfs/Data/vertnet/gulo/hfs/occ/part-00000"
+ tax-source "/mnt/hgfs/Data/vertnet/gulo/hfs/tax/part-00000"
+ loc-source "/mnt/hgfs/Data/vertnet/gulo/hfs/loc/part-00000"
+ tax-loc-source "/mnt/hgfs/Data/vertnet/gulo/hfs/tax-loc/part-00000"]
+ (prepare-zip "occ" occ-columns occ-source sink)
+ (prepare-zip "tax" ["tax_id" "name"] tax-source sink)
+ (prepare-zip "loc" ["loc_id" "lat" "lon"] loc-source sink)
+ (prepare-zip "tax-loc" ["tax_loc_id" "tax_id" "loc_id"] tax-loc-source sink)))
+
+(defn wire-tables
+ []
+ (wire-occ-table :drop-geom true)
+ (wire-tax-table :drop-geom true)
+ (wire-loc-table)
+ (wire-tax-loc-table :drop-geom true))
+
+(comment
+ ;; After harvest and MapReduce steps:
+ (prepare-tables)
+ ;; Then for now manually upload ZIPs to CartoDB and finally:
+ (wire-tables))
View
@@ -31,8 +31,8 @@
(tax-loc-uuid ?tax-loc-uuid ?taxon-id ?loc-id)
(occ-tab :>> fields))
(?<- tax-loc-sink
- [?taxon-id ?loc-id]
- (tax-loc ?taxon-id ?loc-id))))
+ [?tax-loc-id ?taxon-id ?loc-id]
+ (tax-loc-uuid ?tax-loc-id ?taxon-id ?loc-id))))
(defmapcatop map-names
"Emits all taxon names."
@@ -73,6 +73,21 @@
(uniques ?lat ?lon)
(util/gen-uuid :> ?uuid))))
-(defmain BuildTables
- ;; TODO
- )
+(comment
+ ;; First harvest the archives:
+ (harvest ["http://vertnet.nhm.ku.edu:8080/ipt/archive.do?r=nysm_mammals"]
+ "/mnt/hgfs/Data/vertnet/gulo/harvest/data.csv")
+
+ ;; Then MapReduce to build the tables:
+ (location-table (taps/hfs-delimited "/mnt/hgfs/Data/vertnet/gulo/harvest/data.csv")
+ "/mnt/hgfs/Data/vertnet/gulo/hfs/loc")
+
+ (taxon-table (taps/hfs-delimited "/mnt/hgfs/Data/vertnet/gulo/harvest/data.csv")
+ "/mnt/hgfs/Data/vertnet/gulo/hfs/tax")
+
+ (occ-tax-loc
+ "/mnt/hgfs/Data/vertnet/gulo/harvest/data.csv"
+ "/mnt/hgfs/Data/vertnet/gulo/hfs/tax/part-00000"
+ "/mnt/hgfs/Data/vertnet/gulo/hfs/loc/part-00000"
+ "/mnt/hgfs/Data/vertnet/gulo/hfs/occ"
+ "/mnt/hgfs/Data/vertnet/gulo/hfs/tax-loc"))
View
@@ -13,7 +13,8 @@
(defn dwca-urls
"Return vector of Darwin Core Archive URLs as strings."
[]
- (vec (map #(first (vals %)) (cdb/query "vertnet" "SELECT dwca_url FROM publishers"))))
+ (println :what)
+ (vec (map #(first (vals %)) (cdb/query "SELECT dwca_url FROM publishers" "vertnet"))))
(defn prepend-uuid
"Return vector of supplied DarwinCoreRecord values with a UUID prepended."
@@ -29,8 +30,5 @@
(defn harvest
"Harvest Darwin Core Archives from URLs into a tab delimited file at path."
- [& {:keys [urls path]
- :or {urls (dwca-urls) path (str (->> (Files/createTempDir) .getPath) "/dwc.csv")}}]
- (println path)
- (map (partial url->csv path) urls))
-
+ [urls path]
+ (map (partial url->csv path) urls))

0 comments on commit 219a420

Please sign in to comment.