### Links
* http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html

### Mixing Scala and Python

In [1]:
print("Hello")

Hello


In [1]:
%%pyspark
print "Hello"

Hello


### Create RDDs, DataFrames and Datasets

Implicits allow a quick conversions of Collections and RDDs to Datasets and DataFrames.

In [3]:
import sqlContext.implicits._

In [89]:
val rdd = sc.parallelize(Array(1, 2, 3))
rdd.toDS()

[value: int]

In [139]:
sc.makeRDD(1 to 5).toDF("value")

[value: int]

Encoders for most common types are automatically provided by importing sqlContext.implicits._

In [100]:
Seq(1, 2, 3).toDS().collect()

Array(1, 2, 3)

In [101]:
Seq(("Manuel", 36), ("Homer", 33)).toDF("name", "age").collect()

Array([Manuel,36], [Homer,33])

In [160]:
val data = sc.parallelize(
  """{"name":"Manuel","age":36}""" :: """{"name":"Homer","age":33}""" :: Nil)
sqlContext.read.json(data).collect()

Array([36,Manuel], [33,Homer])

Encoders are also created for case classes.

In [149]:
case class Person(name: String, age: Long)
val df = Seq(Person("Manuel", 36), Person("Homer", 33)).toDF()
df.collect()

Array([Manuel,36], [Homer,33])

DataFrames can be converted to a Dataset by providing a class. The mapping is done by name.

In [104]:
sqlContext.createDataFrame(List(1,2,3), Seq("x"))

Name: Compile Error
Message: <console>:24: error: overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (List[Int], Seq[String])
              sqlContext.createDataFrame(List(1,2,3), Seq("x"))
                         ^
StackTrace: 

### SQL Temp Tables

In [None]:
df.registerTempTable("person")

In [154]:
sqlContext.tables.show()

+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|   person|       true|
+---------+-----------+



In [156]:
sqlContext.sql("select * from person").show()

+------+---+
|  name|age|
+------+---+
|Manuel| 36|
| Homer| 33|
+------+---+



In [152]:
%%sql select * from person where age > 33

+------+---+
|  name|age|
+------+---+
|Manuel| 36|
+------+---+



### Import DataFrames from external sources
Load JSON, parquet or Hive tables from local Files, HDFS or S3

In [1]:
val df = Seq(("Manuel", 36), ("Homer", 33)).toDF("name", "age")
df.write.mode("overwrite").json("people.json")
val df = sqlContext.read.json("people.json")
df.as[Person].collect()

Name: Compile Error
Message: <console>:18: error: value toDF is not a member of Seq[(String, Int)]
         val df = Seq(("Manuel", 36), ("Homer", 33)).toDF("name", "age")
                                                     ^
StackTrace: 

### Explode column

In [None]:
val df = sc.makeRDD(Seq("a,b","c,d")).toDF("words")
df.show()
df.explode("words", "word"){words:String => words.split(",")}.show()

### Manipulate DataFrames with Map, FlatMap, Reduce, ...

In [3]:
distData.reduce((a, b) => a + b)

15

### WordCount

In [1]:
import sqlContext.implicits._

In [2]:
val textFile = sc.textFile("README.md")

In [3]:
val wordCount = textFile.map(_.toLowerCase).
    flatMap(_.split("[ ,.*()?#\\[\\]]")).
    filter(!_.isEmpty).
    map(word => (word,1)).
    reduceByKey(_ + _).toDF("word", "count")

In [4]:
wordCount.orderBy("count").show(3)

+---------+-----+
|     word|count|
+---------+-----+
|necessary|    1|
|community|    1|
|  running|    1|
+---------+-----+
only showing top 3 rows



 ### ML, Logistic regression

In [57]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = sqlContext.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance.  This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01)

// Learn a LogisticRegression model.  This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20).
  put(lr.maxIter, 30). // Specify 1 Param.  This overwrites the original maxIter.
  put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

// Prepare test data.
val test = sqlContext.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test).
  select("features", "label", "myProbability", "prediction").
  collect().
  foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }

LogisticRegression parameters:
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name (default: rawPrediction)
regParam: regularization parameter (>= 0) (default: 0.0)
standardization: whether to standardize the training features before fitting the model (default: true)
threshold: threshold in binary