Skip to content

Commit

Permalink
Initial check in
Browse files Browse the repository at this point in the history
  • Loading branch information
mlimotte committed Jan 9, 2013
1 parent 684a14f commit 105039b
Show file tree
Hide file tree
Showing 10 changed files with 618 additions and 1 deletion.
10 changes: 10 additions & 0 deletions .gitignore
@@ -0,0 +1,10 @@
/target
/lib
/classes
/checkouts
pom.xml
*.jar
*.class
.lein-deps-sum
.lein-failures
.lein-plugins
146 changes: 145 additions & 1 deletion README.md
@@ -1,4 +1,148 @@
clj-spark
=========

A Clojure api for the the Spark project (a fast, open source cluster computing system).
A Clojure api for the Spark Project. It is useable, but not complete. I
provide it as a starting point. It should be simple enough to add the
additional wrappers as you need them. This is the result of about two weeks of
work.

It handles many of the initial problems like serializing anonymous functions,
converting back and forth between Scala Tuples and Clojure seqs, and
converting RDDs to PairRDDs.

### Example usage

There is a complete sample program in src/clj_spark/examples/query.clj. To run
it, clone this repo and cd into it. You will need Leiningen 2 installed
(assuming this is available on your PATH as lein2):

$ git clone ...
$ cd clj-spark
$ lein2 deps
$ lein2 compile
$ lein2 run
Compiling clj-spark.api
2013-01-02 13:18:41.477 java[65466:1903] Unable to load realm mapping info from
SCDynamicStore
==============
Premium Per State
NY 600.0

==============
TOP100
#{1 2}

==============
CTE Per State
NY 70.0

==============
TOP100PERSTATE
{NY #{1 2}}

==============
Standalone CTE Per State
NY 70.0
==============

The following are subsections copied from query.clj:

Here is a sample of creating an RDD:

(-> (.textFile sc testfile)
(k/map k/csv-split)
; _,policy-id,field-id,_,_,element-id,element-value
(k/map (k/feach identity as-integer as-long identity identity as-integer as-double))
(k/map (juxt (k/fchoose 1 2) (k/fchoose 5 6))) ; [ [policy-id field-id] [element-id element-value] ]
k/cache)

And a sample query on that data:

(-> input-rdd
(k/map second) ; [element-id element-value]
(k/reduce-by-key +) ; [element-id total]
(k/map (k/fchoose 1 0)) ; [total element-id]
(k/sort-by-key false) ; desc
(k/map second) ; element-id
(k/take 2) ; TODO n=100
set)

Running queries from the REPL
==========

You can also start a repl and play around:

# assuming you already did deps and compile above.
lein2 repl

; deleted results to be more concise
user=> (use 'serializable.fn 'clj-spark.util)
user=> (require '[clj-spark.api :as k])
user=> (def sc (k/spark-context :master "local" :job-name "Simple Job"))
user=> (def r1 (k/parallelize sc [10 20 25 30 35]))
user=> (def r2 (k/text-file sc "test/resources/input.csv"))

user=> (k/count r2)
5
user=> (def result (-> r1 (k/map inc) (k/map (fn [t] [(even? t) t])) (k/reduce-by-key +)))
#'clj-spark.examples.query/result
user=> (k/collect result)
#<ArrayList [[false 63], [true 62]]>
; or, all in one step:
user=> (-> r1 (k/map inc) (k/map (fn [t] [(even? t) t])) (k/reduce-by-key +) k/collect)
#<ArrayList [[false 63], [true 62]]>

Other clojure apis
==========

After working on this, I found another Clojure API for Spark project: https://github.com/markhamstra/spark/tree/master/cljspark

It's a bit more complete, but no examples. You might find good ideas in both projects.


What is Spark
==========

From http://spark-project.org/docs/latest/index.html

Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter. It provides clean, language-integrated APIs in Scala and Java, with a rich array of parallel operators. Spark can run on top of the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or without an independent resource manager (“standalone mode”).

Known Issues
==========

## Function serialization

You must create your anonymous functions using serializable.fn/fn, as in:

(ns ...
(:require \[serializable.fn :as sfn\]))

(sfn/fn my-inc \[x\] (+ x 1))

Do not use clojure.core/fn or #(). This is necessary because the anonymous function must be serialized so it can be passed around to distributed tasks.

## AOT compilation

Generally speaking, any functions that are used in the Spark calls will need to be part of AOT compiled namespaces. I.e. they need to be compiled or the distributed Spark tasks will not be able to find them. In some cases, compiling on the fly might work also:

(compile 'your-namespace)

But you need to do this somewhere where it will be executed for each task.

NOTE: This should be avoidable using the serializable.fn as above, but I did not get that to work in my initial attempts.

## None of the Double* method are implemented

The Spark Java API provides versions of some methods that accept or return Doubles. E.g. (copied from Spark docs, using Scala syntax):

def map[R](f: DoubleFunction[T]): JavaDoubleRDD

So class DoubleFunction<T> has a function type of T => Double

Compare this to the standard:

def map[R](f: Function[T, R]): JavaRDD[R]

Where Function<T, R> has type T => R

I didn't wrap any of these. To be honest, I don't see why they are needed. Maybe I'm missing something or maybe it just doesn't matter when called from a dynamically typed language like Clojure. Instead of DoubleFunction[T], just use Function<T, Double> which has type T => Double. I don't see why this wouldn't work, but interested to know if there is a case where this fails or is sub-optimal.
7 changes: 7 additions & 0 deletions log4j.properties
@@ -0,0 +1,7 @@
# log4j config for clojure development
log4j.rootLogger=warn, stdout

# Console appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %5p %c{1}:%L - %m%n
22 changes: 22 additions & 0 deletions project.clj
@@ -0,0 +1,22 @@
(defproject clj-spark/clj-spark "0.1.0-SNAPSHOT"

:min-lein-version "2.0.0"

:license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"}
:description "Clojure API wrapper on the Spark project (http://spark-project.org/)"
:url "https://github.com/TheClimateCorporation/clj-spark"

:jvm-opts ["-Dlog4j.configuration=file:log4j.properties"]

:dependencies [[org.clojure/clojure "1.4.0"]
[org.clojure/tools.logging "0.2.3"]
[org.clojure/tools.cli "0.2.1"]
[org.clojars.mlimotte/serializable-fn "0.0.3"]
[org.spark-project/spark-core_2.9.2 "0.6.1"]]

:aot [clj-spark.spark.functions
clj-spark.api
weatherbill.query
clj-spark.examples.query]

:main clj-spark.examples.query)
168 changes: 168 additions & 0 deletions src/clj_spark/api.clj
@@ -0,0 +1,168 @@
(ns clj-spark.api
(:use clj-spark.spark.functions)
(:refer-clojure :exclude [map reduce count filter first take distinct])
(:require
[serializable.fn :as sfn]
[clojure.string :as s]
[clj-spark.util :as util])
(:import
java.util.Comparator
spark.api.java.JavaSparkContext))

; Helpers

(defn spark-context
[& {:keys [master job-name]}]
;JavaSparkContext(master: String, jobName: String, sparkHome: String, jars: Array[String], environment: Map[String, String])
(JavaSparkContext. master job-name))

(defn- untuple
[t]
[(._1 t) (._2 t)])

(defn- double-untuple
"Convert (k, (v, w)) to [k [v w]]."
[t]
(let [[x t2] (untuple t)]
(vector x (untuple t2))))

(def csv-split util/csv-split)

(defn ftopn
"Return a fn that takes (key, values), sorts the values in DESC order,
and takes the top N values. Returns (k, top-values)."
[n]
(fn [[k values]]
(vector k (->> values (sort util/rcompare) (clojure.core/take n)))))

(defn fchoose
[& indices]
(fn [coll]
(util/choose coll indices)))

(defn ftruthy?
[f]
(sfn/fn [x] (util/truthy? (f x))))

(defn feach
"Mostly useful for parsing a seq of Strings to their respective types. Example
(k/map (k/feach as-integer as-long identity identity as-integer as-double))
Implies that each entry in the RDD is a sequence of 6 things. The first element should be
parsed as an Integer, the second as a Long, etc. The actual functions supplied here can be
any arbitray transformation (e.g. identity)."
[& fs]
(fn [coll]
(clojure.core/map (fn [f x] (f x)) fs coll)))

; RDD construction

(defn text-file
[spark-context filename]
(.textFile spark-context filename))

(defn parallelize
[spark-context lst]
(.parallelize spark-context lst))

; Transformations

(defn echo-types
; TODO make this recursive
[c]
(if (coll? c)
(println "TYPES" (clojure.core/map type c))
(println "TYPES" (type c)))
c)

(defn trace
[msg]
(fn [x]
(prn "TRACE" msg x)
x))

(defn map
[rdd f]
(.map rdd (function f)))

(defn reduce
[rdd f]
(.reduce rdd (function2 f)))

(defn flat-map
[rdd f]
(.map rdd (flat-map-function f)))

(defn filter
[rdd f]
(.filter rdd (function (ftruthy? f))))

(defn foreach
[rdd f]
(.foreach rdd (void-function f)))

(defn aggregate
[rdd zero-value seq-op comb-op]
(.aggregate rdd zero-value (function2 seq-op) (function2 comb-op)))

(defn fold
[rdd zero-value f]
(.fold rdd zero-value (function2 f)))

(defn reduce-by-key
[rdd f]
(-> rdd
(.map (pair-function identity))
(.reduceByKey (function2 f))
(.map (function untuple))))

(defn group-by-key
[rdd]
(-> rdd
(.map (pair-function identity))
.groupByKey
(.map (function untuple))))

(defn sort-by-key
([rdd]
(sort-by-key rdd compare true))
([rdd x]
; Note: RDD has a .sortByKey signature with just a Boolean arg, but it doesn't
; seem to work when I try it, bool is ignored.
(if (instance? Boolean x)
(sort-by-key rdd compare x)
(sort-by-key rdd x true)))
([rdd compare-fn asc?]
(-> rdd
(.map (pair-function identity))
(.sortByKey
(if (instance? Comparator compare-fn)
compare-fn
(comparator compare-fn))
(util/truthy? asc?))
(.map (function untuple)))))

(defn join
[rdd other]
(-> rdd
(.map (pair-function identity))
(.join (.map other (pair-function identity)))
(.map (function double-untuple))))

; Actions

(def first (memfn first))

(def count (memfn count))

(def glom (memfn glom))

(def cache (memfn cache))

(def collect (memfn collect))

; take defined with memfn fails with an ArityException, so doing this instead:
(defn take
[rdd cnt]
(.take rdd cnt))

(def distinct (memfn distinct))
12 changes: 12 additions & 0 deletions src/clj_spark/examples/extra.clj
@@ -0,0 +1,12 @@
(ns clj-spark.examples.extra
"This namespace simulates getting a dataset from a distinct source. This could be
a SQL query, for example.")

(defn get-data
"Returns a result set as a seq of Maps. Similar to a result set acquired by clojure.data.jdbc."
[]
(map (partial zipmap [:policy_id :field_id :state :policy_premium :acres])
[[(int 1) 10 "NY" 100.0 2]
[(int 1) 20 "NY" 200.0 2]
[(int 2) 10 "CT" 300.0 2]
[(int 2) 11 "CT" 400.0 2]]))

0 comments on commit 105039b

Please sign in to comment.