Skip to content

Commit

Permalink
upgrade to cascading 1.2, optimize distinct algorithm. big performanc…
Browse files Browse the repository at this point in the history
…e improvements
  • Loading branch information
nathanmarz committed Dec 1, 2010
1 parent ba9a2a3 commit df2fa15
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 18 deletions.
4 changes: 2 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
(defproject cascalog/cascalog "1.5.0-SNAPSHOT"
(defproject cascalog/cascalog "1.5.0"
:source-path "src/clj"
:java-source-path "src/jvm"
:java-fork "true"
:javac-debug "true"
:hooks [leiningen.hooks.javac]
:dependencies [[org.clojure/clojure "1.2.0"]
[org.clojure/clojure-contrib "1.2.0"]
[cascading1.1 "1.1.3-SNAPSHOT"]
[cascading/cascading-core "1.2-wip-63" :exclusions [org.codehaus.janino/janino]]
[log4j/log4j "1.2.16"]
]
:repositories {"conjars" "http://conjars.org/repo/"}
Expand Down
3 changes: 1 addition & 2 deletions src/clj/cascalog/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@

(defn construct
[outvars preds]
"Construct a query programatically. When constructing queries this way, operations should either be vars for operations or values
defined using one of Cascalog's def macros."
"Construct a query programatically. When constructing queries this way, operations should either be vars for operations or values defined using one of Cascalog's def macros."
(let [outvars (vars2str outvars)
preds (for [[p & vars] preds] [p nil (vars2str vars)])]
(cascalog.rules/build-rule outvars preds)
Expand Down
16 changes: 8 additions & 8 deletions src/clj/cascalog/ops.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
(defparallelagg count :init-var #'one
:combine-var #'+)

(defparallelagg sum :init-var #'identity
:combine-var #'+)
(defparallelagg sum :init-var #'identity-tuple
:combine-var #'+-all)

(defparallelagg min :init-var #'identity
:combine-var #'clojure.core/min)
(defparallelagg min :init-var #'identity-tuple
:combine-var #'min-all)

(defparallelagg max :init-var #'identity
:combine-var #'clojure.core/max)
(defparallelagg max :init-var #'identity-tuple
:combine-var #'max-all)

(defparallelagg !count :init-var #'existence-int
:combine-var #'+)
(defparallelagg !count :init-var #'existence-int-all
:combine-var #'+-all)

(defparallelbuf limit :hof? true
:init-hof-var #'limit-init
Expand Down
22 changes: 21 additions & 1 deletion src/clj/cascalog/ops_impl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,27 @@

(defn one [] 1)

(defn existence-int [v] (if v 1 0))
(defn existence-int-all [& tuple] (dofor [v tuple] (if v 1 0)))

(defn identity-tuple [& tuple] tuple)

(defn split-tuple [all]
(let [amt (count all)]
(split-at (/ amt 2) all)))

(defn symmetric-split-tuple-op [afn all]
(let [[tuple1 tuple2] (split-tuple all)]
(doall (map afn tuple1 tuple2))
))

(defn +-all [& all]
(symmetric-split-tuple-op + all))

(defn min-all [& all]
(symmetric-split-tuple-op min all))

(defn max-all [& all]
(symmetric-split-tuple-op max all))

(defn limit-init [options limit]
(fn [sort-tuple & tuple]
Expand Down
4 changes: 2 additions & 2 deletions src/clj/cascalog/playground.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
'(do
(use (quote [clojure.contrib.seq-utils :only [find-first]]))
(use (quote cascalog.api))
(require (quote [cascalog [workflow :as w] [ops :as c]]))
(require (quote [cascalog [workflow :as w] [ops :as c] [vars :as v]]))
))

(defmacro bootstrap-emacs []
'(do
(use (quote [clojure.contrib.seq-utils :only [find-first]]))
(use (quote cascalog.api))
(require (quote [cascalog [workflow :as w] [ops :as c]]))
(require (quote [cascalog [workflow :as w] [ops :as c] [vars :as v]]))

(import (quote [java.io PrintStream]))
(import (quote [cascalog WriterOutputStream]))
Expand Down
3 changes: 1 addition & 2 deletions src/clj/cascalog/predicate.clj
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@
(defpredicate predicate-macro :pred-fn)


;; TODO: change this to use fast first buffer
(def distinct-aggregator (predicate aggregator false nil identity (w/first) identity [] []))
(def distinct-aggregator (predicate aggregator false nil identity (w/fast-first) identity [] []))


(defstruct predicate-variables :in :out)
Expand Down
7 changes: 6 additions & 1 deletion src/clj/cascalog/workflow.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
OutputCollector JobConf]
[java.util Properties Map]
[cascalog ClojureFilter ClojureMapcat ClojureMap
ClojureAggregator Util ClojureBuffer ClojureBufferIter]
ClojureAggregator Util ClojureBuffer ClojureBufferIter FastFirst]
[java.io File]
[java.lang RuntimeException Comparable]))

Expand Down Expand Up @@ -196,6 +196,11 @@
(debug-print "first")
(Every. previous (First.) Fields/RESULTS)))

(defn fast-first []
(fn [previous]
(debug-print "fast-first")
(Every. previous (FastFirst.) Fields/RESULTS)))

(defn select [keep-fields]
(fn [previous]
(debug-print "select" keep-fields)
Expand Down
38 changes: 38 additions & 0 deletions src/jvm/cascalog/FastFirst.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2010 Nathan Marz
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package cascalog;

import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.flow.FlowProcess;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import cascading.operation.BaseOperation;
import java.util.Iterator;

public class FastFirst extends BaseOperation implements Buffer {

public FastFirst() {
super(Fields.ARGS);
}


public void operate(FlowProcess flowProcess, BufferCall bufCall) {
bufCall.getOutputCollector().add(((Iterator<TupleEntry>)bufCall.getArgumentsIterator()).next());
}
}
10 changes: 10 additions & 0 deletions test/cascalog/api_secondary_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,13 @@
(let [tap (cascalog-tap num sink1)]
(?<- tap [?n] (tap ?raw) (inc ?raw :> ?n) (:distinct false))
)))))

(deftest test-symmetric-ops
(with-tmp-sources [nums [[1 2 3] [10 20 30] [100 200 300]]]
(test?<- [[111 222 333 1 2 3 100 200 300]]
[?s1 ?s2 ?s3 ?min1 ?min2 ?min3 ?max1 ?max2 ?max3]
(nums ?a ?b ?c)
(c/sum ?a ?b ?c :> ?s1 ?s2 ?s3)
(c/min ?a ?b ?c :> ?min1 ?min2 ?min3)
(c/max ?a ?b ?c :> ?max1 ?max2 ?max3))
))

0 comments on commit df2fa15

Please sign in to comment.