### Enable Spark in Jupyter (BeakerX) 
> Use this if you are using BeakerX

In [7]:
%%classpath add mvn
org.apache.spark spark-sql_2.11 2.1.1

## Start Spark

[Spark Examples](https://spark.apache.org/examples.html)

[Spark Tutorial](http://backtobazics.com/tutorials/scala-tutorial-for-java-programmers/)

In [21]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val conf = new SparkConf()
  .setMaster("local[*]")
  .set("spark.driver.memory", "4g")

val spark = SparkSession
  .builder()
  .appName("Spark Demo")
  .config(conf)
  .getOrCreate()

val sc = spark.sparkContext
sc.setLogLevel("ERROR")

null

`spark` and `sc` are automatically generated in spark-shell

### Read File as RDD

In [14]:
case class LotInfo(TOOL: String, STEP: String, MATERIAL: String, V1: Double, V2: Int)
val textFile = sc.textFile("data/dataset_1.csv")
                .map(line => line.split (","))
                .map(p => LotInfo(p(0).toString, p(1).toString, p(2).toString, p(3).toDouble, p(4).toInt))

MapPartitionsRDD[3] at map at <console>:16

In [18]:
textFile.take(5)

[LotInfo(﻿tool 1,step 1,material 1,0.869495157,318), LotInfo(tool 1,step 1,material 1,0.357759255,579), LotInfo(tool 1,step 1,material 1,0.255350443,188), LotInfo(tool 1,step 1,material 1,0.774128219,512), LotInfo(tool 1,step 1,material 1,0.272550202,110)]

### Read File as DF

In [32]:
val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("data/dataset_2.csv");

In [34]:
df.printSchema

root
 |-- TOOL: string (nullable = true)
 |-- STEP: string (nullable = true)
 |-- MATERIAL: string (nullable = true)
 |-- VALUE_1: string (nullable = true)
 |-- VALUE_2: string (nullable = true)



null

In [33]:
df.show(5)

+------+------+----------+-----------+-------+
|  TOOL|  STEP|  MATERIAL|    VALUE_1|VALUE_2|
+------+------+----------+-----------+-------+
|tool 1|step 1|material 1|0.715171243|    986|
|tool 1|step 1|material 1|0.958547461|     35|
|tool 1|step 1|material 1|0.542267277|    589|
|tool 1|step 1|material 1|0.778020313|    967|
|tool 1|step 1|material 1|0.820385525|    500|
+------+------+----------+-----------+-------+
only showing top 5 rows



null

### Read DF from Hive

```
val hql: String ="""
select
   sample_date,
   sample_id,
   ch_id,
   ckc_id
from prod_mti_singapore_fab_10_spc_dm.samples_calc
where sys_part_yyyy = 2018 and sys_part_mm = 06
"""

val df = spark.sql(hql); 
```

## DataFrame Operation

- show

In [55]:
df.show(30)

+------+------+----------+-----------+-------+
|  TOOL|  STEP|  MATERIAL|    VALUE_1|VALUE_2|
+------+------+----------+-----------+-------+
|tool 1|step 1|material 1|0.715171243|    986|
|tool 1|step 1|material 1|0.958547461|     35|
|tool 1|step 1|material 1|0.542267277|    589|
|tool 1|step 1|material 1|0.778020313|    967|
|tool 1|step 1|material 1|0.820385525|    500|
|tool 1|step 1|material 1|0.116683078|    914|
|tool 1|step 1|material 2|0.830316155|    783|
|tool 1|step 1|material 2|0.296609434|    261|
|tool 1|step 1|material 2|0.381470171|    139|
|tool 1|step 1|material 2|0.285614758|    663|
|tool 2|step 1|material 2|0.397976087|    880|
|tool 2|step 1|material 2|0.134806317|    558|
|tool 2|step 1|material 3|0.602803637|    570|
|tool 2|step 1|material 3|0.080131348|    891|
|tool 2|step 1|material 3|0.228636809|    657|
|tool 2|step 2|material 3|0.688289551|    788|
|tool 2|step 2|material 3|0.359787374|    991|
|tool 2|step 2|material 3| 0.21757853|    144|
|tool 2|step 

null

- filter

In [48]:
import org.apache.spark.sql.functions.{col, lit}
df.filter(df("TOOL") === lit("tool 1")).show(5)

+------+------+----------+-----------+-------+
|  TOOL|  STEP|  MATERIAL|    VALUE_1|VALUE_2|
+------+------+----------+-----------+-------+
|tool 1|step 1|material 1|0.715171243|    986|
|tool 1|step 1|material 1|0.958547461|     35|
|tool 1|step 1|material 1|0.542267277|    589|
|tool 1|step 1|material 1|0.778020313|    967|
|tool 1|step 1|material 1|0.820385525|    500|
+------+------+----------+-----------+-------+
only showing top 5 rows



null

In [47]:
df.filter("TOOL = 'tool 1'").show(5)

+------+------+----------+-----------+-------+
|  TOOL|  STEP|  MATERIAL|    VALUE_1|VALUE_2|
+------+------+----------+-----------+-------+
|tool 1|step 1|material 1|0.715171243|    986|
|tool 1|step 1|material 1|0.958547461|     35|
|tool 1|step 1|material 1|0.542267277|    589|
|tool 1|step 1|material 1|0.778020313|    967|
|tool 1|step 1|material 1|0.820385525|    500|
+------+------+----------+-----------+-------+
only showing top 5 rows



null

- withColumn

> withColumn(<new_column>, {expression_to_build_new_col})

In [53]:
df.withColumn("NEW_COL", df("VALUE_1") * 3).show(5)

+------+------+----------+-----------+-------+------------------+
|  TOOL|  STEP|  MATERIAL|    VALUE_1|VALUE_2|           NEW_COL|
+------+------+----------+-----------+-------+------------------+
|tool 1|step 1|material 1|0.715171243|    986|       2.145513729|
|tool 1|step 1|material 1|0.958547461|     35|2.8756423829999997|
|tool 1|step 1|material 1|0.542267277|    589|1.6268018309999999|
|tool 1|step 1|material 1|0.778020313|    967|       2.334060939|
|tool 1|step 1|material 1|0.820385525|    500|       2.461156575|
+------+------+----------+-----------+-------+------------------+
only showing top 5 rows



null

- select

In [57]:
df.select("TOOL").show(5)

+------+
|  TOOL|
+------+
|tool 1|
|tool 1|
|tool 1|
|tool 1|
|tool 1|
+------+
only showing top 5 rows



null

- toPandas()         
> pyspark only

```
df_pd = df.toPandas()

```

- join

- drop

In [59]:
df.drop("TOOL").show(5)

+------+----------+-----------+-------+
|  STEP|  MATERIAL|    VALUE_1|VALUE_2|
+------+----------+-----------+-------+
|step 1|material 1|0.715171243|    986|
|step 1|material 1|0.958547461|     35|
|step 1|material 1|0.542267277|    589|
|step 1|material 1|0.778020313|    967|
|step 1|material 1|0.820385525|    500|
+------+----------+-----------+-------+
only showing top 5 rows



null

## RDD Operation

- map

In [62]:
val tf = sc.textFile("data/dataset_1.csv")

data/dataset_1.csv MapPartitionsRDD[74] at textFile at <console>:108

In [67]:
tf.take(1)

[﻿tool 1,step 1,material 1,0.869495157,318]

In [75]:
val tfRdd = tf.map(line => line.split (","))

MapPartitionsRDD[79] at map at <console>:108

In [76]:
tfRdd.take(2)

[[﻿tool 1, step 1, material 1, 0.869495157, 318], [tool 1, step 1, material 1, 0.357759255, 579]]

- flatMap

In [73]:
val tfFmRdd = tf.flatMap(line => line.split (","))

MapPartitionsRDD[78] at flatMap at <console>:108

In [86]:
val s = tfFmRdd.take(2)

[﻿tool 1, step 1]

- filter

In [106]:
tf.map(line => line.split (",")).filter(s => s(0) == "tool 1").take(5)

[[tool 1, step 1, material 1, 0.357759255, 579], [tool 1, step 1, material 1, 0.255350443, 188], [tool 1, step 1, material 1, 0.774128219, 512], [tool 1, step 1, material 1, 0.272550202, 110], [tool 1, step 1, material 1, 0.327189103, 11]]

- reduceByKey

<img src="img/spark-reduceByKey.png" width="600" height="450" style="display: block; margin: 0 auto">


In [136]:
val tf = sc.textFile("data/dataset_1.csv")
tf.map(line => line.split ("\t"))
    .map(s => (s(1).trim, s(3).toDouble))
    .reduceByKey(_ + _)
    .take(10)

[(step 2,6.579661281000001), (step 1,7.814073281000001)]

- groupByKey

<img src="img/spark-groupByKey.png" width="600" height="450" style="display: block; margin: 0 auto">


In [135]:
val tf = sc.textFile("data/dataset_1.csv")
tf.map(line => line.split ("\t"))
    .map(s => (s(0).trim, s(3).toDouble))
    .groupByKey().foreach(println)

(tool 2,CompactBuffer(0.913789601, 0.450131428, 0.335269137, 0.240003101, 0.680100259, 0.177084315, 0.409285299, 0.467945493, 0.375077091, 0.536814527))
(tool 3,CompactBuffer(0.479830267, 0.304870537, 0.522640386, 0.02014818, 0.279830528, 0.271758789, 0.436192934, 0.933186634, 0.73414128, 0.630855021))
(tool 1,CompactBuffer(0.869495157, 0.357759255, 0.255350443, 0.774128219, 0.272550202, 0.327189103, 0.642169036, 0.651917744, 0.759575849, 0.284644747))


null

- foreach

## Save Data

- saveAsTextFile

> By default it will save to HDFS. use file:/// to save to local file system

In [147]:
tf.map(line => line.split ("\t"))
    .map(s => (s(0).trim, s(3).toDouble))
    .groupByKey()
    .repartition(1)
    .saveAsTextFile("file:///data/sampleDataOutput")

null

In [148]:
df.write
    .format("com.databricks.spark.csv")
    .save("file:///data/sampleDataDFOutput")

null