## Advantage of Spark

1. Spark is easy to program and don't require that much hand coding whereas MapReduce is not that easy in terms of programming and requires lots of hand coding
2. It has interactive mode whereas in MapReduce there is no built-in interactive mode, MapReduce is developed for batch processing.
3. For data processing Spark can use streaming, machine learning, and batch processing whereas Hadoop MapReduce can use the batch engine. Spark is general purpose cluster computation engine.
4. Spark executes batch processing jobs about 10 to 100 times faster than Hadoop MapReduce.
5. Spark uses an abstraction called RDD which makes Spark feature rich, whereas map reduce doesn't have any abstraction
6. Spark uses lower latency by caching partial/complete results across distributed nodes whereas MapReduce is completely disk-based.

Source: https://community.hortonworks.com/questions/129786/benefits-of-spark-over-mapreduce-or-spark-vs-mapre.html

Detailed Comparison: https://data-flair.training/blogs/apache-spark-vs-hadoop-mapreduce/

## RDD

<img src="img/RDD_DATAFRAME_DATASETS.png" width="400" height="auto" />

#### What is RDD?

Source: https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/

- Spark RDD APIs – An RDD stands for Resilient Distributed Datasets. It is __Read-only__ partition collection of records. RDD is the fundamental data structure of Spark. It allows a programmer to perform in-memory computations on large clusters in a fault-tolerant manner. Thus, speed up the task. Follow [this link to learn Spark RDD in great detail](https://data-flair.training/blogs/apache-spark-rdd-tutorial/).
- Spark Dataframe APIs – Unlike an RDD, data organized into __named columns__. For example a table in a relational database. It is an immutable distributed collection of data. DataFrame in Spark allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction. Follow [this link to learn Spark DataFrame in detail](http://data-flair.training/blogs/apache-spark-sql-dataframe-tutorial/).
- Spark Dataset APIs – Datasets in Apache Spark are an __extension__ of DataFrame API which provides type-safe, object-oriented programming interface. Dataset takes advantage of Spark’s Catalyst optimizer by exposing expressions and data fields to a query planner. Follow [this link to learn Spark DataSet in detail](http://data-flair.training/blogs/apache-spark-dataset-tutorial/).

3.2. Data Representation
- RDD – RDD is a distributed collection of data elements spread across many machines in the cluster. RDDs are a set of Java or Scala objects representing data.
- DataFrame – A DataFrame is a distributed collection of data organized into named columns. It is conceptually equal to a table in a relational database.
- DataSet – It is an extension of DataFrame API that provides the functionality of – type-safe, object-oriented programming interface of the RDD API and performance benefits of the Catalyst query optimizer and off heap storage mechanism of a DataFrame API.

3.3. Data Formats
- RDD – It can easily and efficiently process data which is structured as well as unstructured. But like Dataframe and DataSets, RDD does not infer the schema of the ingested data and requires the user to specify it.
- DataFrame – It can process structured and unstructured data efficiently. It organizes the data in the named column. DataFrames allow the Spark to manage schema.
- DataSet – It also efficiently processes structured and unstructured data. It represents data in the form of JVM objects of row or a collection of row object. Which is represented in tabular forms through encoders.

3.4. Data Sources API
- RDD – Data source API allows that an RDD could come from __any data source__ e.g. text file, a database via JDBC etc. and easily handle data with __no predefined structure__.
- DataFrame – Data source API allows Data processing in different formats (AVRO, CSV, JSON, and storage system HDFS, HIVE tables, MySQL). It can read and write from various data sources that are mentioned above.
- DataSet – Dataset API of spark also support data from different sources.

3.5. Immutability and Interoperability
- RDD – RDDs contains the collection of records which are partitioned. The basic unit of parallelism in an RDD is called partition. Each partition is one logical division of data which is __immutable__ and created through some transformation on existing partitions. Immutability helps to achieve consistency in computations. We can move from RDD to DataFrame (If RDD is in tabular format) by toDF() method or we can do the reverse by the .rdd method. Learn various RDD Transformations and Actions APIs with examples.
- DataFrame – After transforming into DataFrame one cannot regenerate a domain object. For example, if you generate testDF from testRDD, then you won’t be able to recover the original RDD of the test class.
- DataSet – It overcomes the limitation of DataFrame to regenerate the RDD from Dataframe. Datasets allow you to convert your existing RDD and DataFrames into Datasets.

3.7. Optimization
- RDD – No inbuilt optimization engine is available in RDD. When working with structured data, RDDs cannot take advantages of sparks advance optimizers. For example, catalyst optimizer and Tungsten execution engine. Developers optimise each RDD on the basis of its attributes.
- DataFrame – Optimization takes place using catalyst optimizer. Dataframes use catalyst tree transformation framework in four phases: a) Analyzing a logical plan to resolve references. b) Logical plan optimization. c) Physical planning. d) Code generation to compile parts of the query to Java bytecode. The brief over.jpgiew of optimization phase is also given in the below figure:
<br>
<img src="img/Spark-SQL-Optimization.jpg" height="auto" width="800">
<br>
- Dataset – It includes the concept of Dataframe Catalyst optimizer for optimizing query plan.

3.8. Serialization
- RDD – Whenever Spark needs to distribute the data within the cluster or write the data to disk, it does so use Java serialization. The overhead of serializing individual Java and Scala objects is expensive and requires sending both data and structure between nodes.
- DataFrame – Spark DataFrame Can serialize the data into off-heap storage (in memory) in binary format and then perform many transformations directly on this off heap memory because spark understands the schema. There is no need to use java serialization to encode the data. It provides a Tungsten physical execution backend which explicitly manages memory and dynamically generates bytecode for expression evaluation.
- DataSet – When it comes to serializing data, the Dataset API in Spark has the concept of an encoder which handles conversion between JVM objects to tabular representation. It stores tabular representation using spark internal Tungsten binary format. Dataset allows performing the operation on serialized data and improving memory use. It allows on-demand access to individual attribute without desterilizing the entire object.

3.10. Efficiency/Memory use
- RDD – Efficiency is decreased when serialization is performed individually on a java and scala object which takes lots of time.
- DataFrame – Use of off heap memory for serialization reduces the overhead. It generates byte code dynamically so that many operations can be performed on that serialized data. No need for deserialization for small operations.
- DataSet – It allows performing an operation on serialized data and improving memory use. Thus it allows on-demand access to individual attribute without deserializing the entire object.

3.14. Aggregation
- RDD – RDD API is slower to perform simple grouping and aggregation operations.
- DataFrame – DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets.
- DataSet – In Dataset it is faster to perform aggregation operation on plenty of data sets.

3.15. Usage Area
- RDD-You can use RDDs When you want low-level transformation and actions on your data set.Use RDDs When you need high-level abstractions.

- DataFrame and DataSet- One can use both DataFrame and dataset API when we need a high level of abstraction. For unstructured data, such as media streams or streams of text. You can use both Data Frames or Dataset when you need domain specific APIs. When you want to manipulate your data with functional programming constructs than domain specific expression. We can use either datasets or DataFrame in the high-level expression. For example, filter, maps, aggregation, sum, SQL queries, and columnar access. When you do not care about imposing a schema, such as columnar format while processing or accessing data attributes by name or column. in addition, If we want a higher degree of type safety at compile time.

#### RDD Action and Operation

<img src="img/CORE_ACTIONS_TRANSFORMATIONS.png" width="800" height="auto" >

- Spark Transformation is a function that produces __new RDD__ from the __existing RDDs__.
- Actions are Spark RDD operations that give __non-RDD values__.

#### From a file
```scala
sc.wholeTextFile()
sc.textFile()
```

#### From a Data Source(sql, nosql)
```scala
spark.sql
```

#### In code
```scala
sc.parallelize()
```

\* spark streaming is also a way to create rdd but is not included in the this training 

## READ DATA FROM FILE AND LOAD INTO DATAFRAME

```scala
sc.wholeTextFile()
sc.textFile()
```

In [1]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
import $profile.`hadoop-2.6`
import $ivy.`org.apache.spark::spark-sql:2.1.0` // adjust spark version - spark >= 2.0
import $ivy.`org.apache.hadoop:hadoop-aws:2.6.4`
import $ivy.`org.jupyter-scala::spark:0.4.2` 

import org.apache.spark._
import org.apache.spark.sql._
import jupyter.spark.session._

Checking https://repo1.maven.org/maven2/joda-time/joda-time/maven-metadata.xml
Checking https://repo1.maven.org/maven2/joda-time/joda-time/maven-metadata.xml.sha1
Checked https://repo1.maven.org/maven2/joda-time/joda-time/maven-metadata.xml
Checked https://repo1.maven.org/maven2/joda-time/joda-time/maven-metadata.xml.sha1


[32mimport [39m[36m$exclude.$                        , $ivy.$                            // for cleaner logs
[39m
[32mimport [39m[36m$profile.$           
[39m
[32mimport [39m[36m$ivy.$                                   // adjust spark version - spark >= 2.0
[39m
[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                

[39m
[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36mjupyter.spark.session._[39m

```scala
sc.wholeTextFile()
sc.textFile()
```

In [2]:
val spark = JupyterSparkSession.builder() // important - call this rather than SparkSession.builder()
  .jupyter() // this method must be called straightaway after builder()
  // .yarn("/etc/hadoop/conf") // optional, for Spark on YARN - argument is the Hadoop conf directory
  // .emr("2.6.4") // on AWS ElasticMapReduce, this adds aws-related to the spark jar list
  .master("local") // change to "yarn-client" on YARN
  .config("spark.executor.instances", "2")
  .config("spark.executor.memory", "3g")
  // .config("spark.hadoop.fs.s3a.access.key", awsCredentials._1)
  // .config("spark.hadoop.fs.s3a.secret.key", awsCredentials._2)
  .appName("notebook")
  .getOrCreate()

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@10b50d46

In [3]:
val sc = spark.sparkContext

[36msc[39m: [32mSparkContext[39m = org.apache.spark.SparkContext@2db45edd

In [4]:
import spark.implicits._

[32mimport [39m[36mspark.implicits._[39m

In [5]:
case class LotInfo(TOOL: String, STEP: String, MATERIAL: String, V1: Double, V2: Int)

defined [32mclass[39m [36mLotInfo[39m

In [None]:
val textFile = sc.textFile("data/dataset_1.csv")
                    .map(line => line.split (","))
                    .map(p => LotInfo(p(0).toString, p(1).toString, p(2).toString, p(3).toDouble, p(4).toInt))

In [6]:
val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("data/dataset_2.csv");

[36mdf[39m: [32mDataFrame[39m = [TOOL: string, STEP: string ... 3 more fields]

In [8]:
df.show(5)

+------+------+----------+-----------+-------+
|  TOOL|  STEP|  MATERIAL|    VALUE_1|VALUE_2|
+------+------+----------+-----------+-------+
|tool 1|step 1|material 1|0.715171243|    986|
|tool 1|step 1|material 1|0.958547461|     35|
|tool 1|step 1|material 1|0.542267277|    589|
|tool 1|step 1|material 1|0.778020313|    967|
|tool 1|step 1|material 1|0.820385525|    500|
+------+------+----------+-----------+-------+
only showing top 5 rows



In [11]:
df.select($"TOOL").show(5)

+------+
|  TOOL|
+------+
|tool 1|
|tool 1|
|tool 1|
|tool 1|
|tool 1|
+------+
only showing top 5 rows



In [12]:
df.filter("TOOL == 'tool 1'").show

+------+------+----------+-----------+-------+
|  TOOL|  STEP|  MATERIAL|    VALUE_1|VALUE_2|
+------+------+----------+-----------+-------+
|tool 1|step 1|material 1|0.715171243|    986|
|tool 1|step 1|material 1|0.958547461|     35|
|tool 1|step 1|material 1|0.542267277|    589|
|tool 1|step 1|material 1|0.778020313|    967|
|tool 1|step 1|material 1|0.820385525|    500|
|tool 1|step 1|material 1|0.116683078|    914|
|tool 1|step 1|material 2|0.830316155|    783|
|tool 1|step 1|material 2|0.296609434|    261|
|tool 1|step 1|material 2|0.381470171|    139|
|tool 1|step 1|material 2|0.285614758|    663|
+------+------+----------+-----------+-------+



In [15]:
df.filter("VALUE_2 > 900").show

+------+------+----------+-----------+-------+
|  TOOL|  STEP|  MATERIAL|    VALUE_1|VALUE_2|
+------+------+----------+-----------+-------+
|tool 1|step 1|material 1|0.715171243|    986|
|tool 1|step 1|material 1|0.778020313|    967|
|tool 1|step 1|material 1|0.116683078|    914|
|tool 2|step 2|material 3|0.359787374|    991|
|tool 3|step 2|material 2|0.230862465|    912|
|tool 3|step 2|material 3|0.765056471|    949|
+------+------+----------+-----------+-------+



## READ DATA FROM HIVE AND LOAD INTO DATAFRAME

```scala
import spark.sql
sc.setLogLevel("ERROR")
val hql2: String ="""
select
   sample_date,
   sample_id,
   ch_id,
   ckc_id,
   max(sys_tt_upd_ts) as sys_tt_upd_ts,
   avg(ext_ewma_mv) as ext_ewma_mv,
   avg(ext_ewma_mv_ucl) as ext_ewma_mv_ucl,
   avg(ext_ewma_mv_lcl) as ext_ewma_mv_lcl,
   avg(ext_sigma_ucl) as ext_sigma_ucl,
   avg(ext_sigma_lcl) as ext_sigma_lcl,
   avg(ext_range_ucl) as ext_range_ucl,
   avg(ext_range_lcl) as ext_range_lcl,
   avg(ext_mv_ucl) as ext_mv_ucl,
   avg(ext_mv_lcl) as ext_mv_lcl
from prod_mti_singapore_fab_10_spc_dm.samples_calc
where sys_part_yyyy = 2018 and sys_part_mm = 06
group by sample_date, sample_id, ch_id, ckc_id"""

val df2 = sql(hql); 
df2.show(10)    
```

<img src="img/df-show.png" width="800" height="600">

## SAVE DATA DATAFRAME

```scala
df2
.repartition(1)
.write
.format("csv")
.option("sep", "\t")
.mode("overwrite")
.option("header", "false")
.save("/eng/mti/singapore/fab_10/qm69a_v2/input_data/space/charts_daily/query_date=" + "20180619")

sql("ALTER TABLE eng_mti_singapore_fab_10_autodiagnostic_v2.space_charts_daily ADD IF NOT EXISTS partition (query_date=\""+ "20180619" +"\") location \"/eng/mti/singapore/fab_10/qm69a_v2/input_data/space/charts_daily/query_date="+ '20180619' +"\"")

```

## READ DATA FROM HBASE

```scala
import org.apache.log4j.{Level, Logger}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.spark.{SparkConf, SparkContext}

sc.setLogLevel("ERROR")

def printRow(result : Result) = {
val cells = result.rawCells();
print( Bytes.toString(result.getRow) + " : " )
for(cell <- cells){
  val col_name = Bytes.toString(CellUtil.cloneQualifier(cell))
  val col_value = Bytes.toString(CellUtil.cloneValue(cell))
  print("(%s,%s) ".format(col_name, col_value))
}
println()
}

object SharedHBaseConnection extends Serializable{
  private var sharedConnection: Option[Connection] = None

  lazy val config = {
    val config = HBaseConfiguration.create()
    config.addResource(new Path("file:///usr/hdp/current/hbase-client/conf/hbase-site.xml"))
    config
  }

  def apply(): Connection = synchronized {
    sharedConnection.getOrElse {
      val connection = ConnectionFactory.createConnection(config)
      sharedConnection = Some(connection)
      connection
    }
  }
}

val hbaseConnection = SharedHBaseConnection()
val dataTable = hbaseConnection.getTable(TableName.valueOf("prod_mti_singapore_fab_10_sigma:sigma_wafer"))
println("connection created")

println("Get Example:")
var get = new Get(Bytes.toBytes("1707981_0711-01"))
var result = dataTable.get(get)
printRow(result)

dataTable.close()
hbaseConnection.close()

System.exit(0)

```

## READ DATA FROM HDFS AND DO WORD COUNT

```scala
val sqlText = sc.textfile("/user/hdfsf10n/pengtan/spark_hive_test/query_date=20180513/part-00000-e7587264-467d-46c0-bfdc-2364b9a09d6e.csv.deflate");

val count = sqlText
           .flatMap(line => line.split(" "))
           .map(word => (word,1))
           .reduceByKey(_+_)
           .collect()
count.foreach(x => println(x))
```

## Introduction to spark-shell

### Difference between spark-submit and spark-shell

- spark-shell will initial sparkSession itself, avaialable as sc directly in the spark-shell
- spark-submit you need to config sparkSession in the code

### open a spark-shell

```shell
export SPARK_MAJOR_VERSION=2
spark-shell --master yarn --driver-memory 4g --executor-memory 2g --executor-cores 2 --driver-cores 1 --queue eng_f10w-01 --name spark_test_f10ds_pengtan
```

### tricks when using spark-shell

```scala

```
sc (sparkContext) and spark (sparkSession) will auto generated. 

```scala
:paste
```
use :paste to paste large block of code

Be careful to use __tab__ (try to avoid them by replace them with 4 space instead)