### The Spark Programming Model
- Spark programming consists of operations on a data set, usually residing in some form of distributed, persistent storage (e.g. HDFS)
- consists of the following steps:
    - Define a set of transformations on the input data set.
    - Invoke actions that output the transformed data sets to persistent storage or return results to the driver's local memory.
    - Run local computations that operate on the results computed in a distributed fashion.
    
### Record Linkage
- the problem of tying multiple duplicate records to the same underlying entity when we have a large collection of records from one or more source systems
- difficulty comes from the fact that criteria for determining duplicate/not-duplicate varies from a case to case basis
    - in some cases, very different looking records will refer to the same entity, and in other case, very similar looking records will actually refer to different entities despite the similarity
    
##### spark-shell instructions
- if running examples on personal computer, can launch a local Spark cluster by specifying ```--master local[N]```, where N is the number of threads to run
    - specifying local[\*] will match the number of threads to the number of cores available on machine
- other arguments
    - ```--driver-memory 2g``` -> lets single local process use 2 GB of memory

In [1]:
// The SparkContext object
sc

org.apache.spark.SparkContext@7dda3abe

#### Resilient Distributed Datasets
- ```SparkContext``` has methods that allow us to create _Resilient Distributed Datasets_, or _RDDs_, which are Spark's abstraction for representing a collection of objects that can be distributed across multiple machines in a cluster
- two ways to create _RDDs_
    - use ```SparkContext``` to create RDD from external data source
    - perform a transformation on one or more existing RDDs, yielding an RDD as a result (e.g. filtering records, aggregating records by common key, joining multiple RDDs together)
- _RDDs_ are laid out across the cluster of machines as a collection of _partitions_, each including a subset of the data
    - Spark then processes the objects within a partition in sequence, and processes multiple partitions in parallel
- One simple way to create an RDD is to use ```parallelize``` method on  ```SparkContext``` with a local collection of objects
    - first arg is the collection of objects to parallelize, in an ```Array```
    - second arg is number of partitions to create

In [2]:
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)

- to create RDD from text file or directory of text files, pass the name of the file or directory to ```textFile``` 
    - ```textFile``` can access paths that reside on the local file system
    - if given a directory, it will consider all of the files in that directory as part of the given RDD
    - no data has yet been read by Spark or loaded into memory yet; instead, objects are loaded into the cluster at computation time

In [3]:
val rawblocks = sc.textFile("linkage")

#### The REPL and Compilation

- Spark supports both interactive shell and compiled applications, which can be compiled and managed using _Apache Maven_
- shell method
    - starting work in the REPL enables quick prototyping, faster iteration, and less lag between ideas and results
    - drawbacks: not suited for large programs, since Scala interpretation takes longer
- hybrid method
    - develop in the REPL, but move established pieces of code into compiled library
    - ```spark-shell``` can use compiled JAR files with the ```--jars``` flag

### Bringing Data from the Cluster to the Client

- RDDs have various method allowing to read data from cluster into Scala REPL
- ```RDD.first``` returns the first element of the RDD into the client

In [4]:
rawblocks.first

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

- ```RDD.collect``` returns all the contents of an RDD to the client as an array
    - not recommended for huge data sets
- ```RDD.take``` allows us to read a given nmber of records into an array on the client

In [5]:
val head = rawblocks.take(10)
head.length

10

- creating an RDD does not cause distrbuted computation to take place on the cluster
- instead, RDDs define logical data sets that are more like intermediate computation steps
- distributed computation occurs upon invoking an _action_ on an RDD
    - e.g. ```count``` action return # objects in RDD

In [6]:
rdd.count()

4

In [7]:
// brings objects from the RDD into local memory as an Array
rdd.collect()

Array(1, 2, 2, 4)

- ```saveAsTextFile``` saves contents of RDD to persistent storage
    - creates a directory, and writes out each partition as a separate file
    - this created directory might be used as an input directory by a future Spark job

In [8]:
rdd.saveAsTextFile("numbers")

Name: org.apache.hadoop.mapred.FileAlreadyExistsException
Message: Output directory file:/home/chtka/Projects/Spark/numbers already exists
StackTrace:   at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1184)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:10

In [9]:
var rdd2 = sc.textFile("numbers")
rdd2.collect()

Array(1, 2, 4, 2)

- ```foreach``` method can be used in conjunction with ```println``` to print out each value in the array on its own line:

In [10]:
head.foreach(println)

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
6698,40542,1,1,1,?,1,1,1,1,1,TRUE
45037,49220,1,?,1,?,1,1,1,1,1,TRUE
31835,69902,1,?,1,1,1,1,1,1,1,TRUE
4356,31352,0.875,?,1,?,1,1,1,1,1,TRUE
45723,49837,1,?,1,?,1,1,1,1,1,TRUE
39716,49297,1,?,1,?,1,1,1,1,1,TRUE
71970,71971,1,?,1,?,1,1,1,1,1,TRUE
96601,96625,1,?,1,?,1,1,1,1,1,TRUE
28553,71491,1,?,1,?,1,1,1,1,1,TRUE


- examining the data, we see a header row that we might want to remove

In [11]:
def isHeader(line: String) = line.contains("id_1")

In [14]:
// head.filterNot(isHeader)
head.filter(x => !isHeader(x))

Array(6698,40542,1,1,1,?,1,1,1,1,1,TRUE, 45037,49220,1,?,1,?,1,1,1,1,1,TRUE, 31835,69902,1,?,1,1,1,1,1,1,1,TRUE, 4356,31352,0.875,?,1,?,1,1,1,1,1,TRUE, 45723,49837,1,?,1,?,1,1,1,1,1,TRUE, 39716,49297,1,?,1,?,1,1,1,1,1,TRUE, 71970,71971,1,?,1,?,1,1,1,1,1,TRUE, 96601,96625,1,?,1,?,1,1,1,1,1,TRUE, 28553,71491,1,?,1,?,1,1,1,1,1,TRUE)

### Shipping Code from the Client to the Cluster
- we can interactively develop and debug data-munging code against a small amount of data that we sample from the cluster before applying to the entire data set when we're ready to transform it

In [12]:
val noheader = rawblocks.filter(x => !isHeader(x))

In [14]:
noheader.first

6698,40542,1,1,1,?,1,1,1,1,1,TRUE

### From RDDs to Data Frames

- Spark's ```DataFrame``` is an abstraction built on top of RDDs for data sets with regular structure
    - each row is made up of a set of columns, and each column has well-defined data type
    - basically Spark analogue of a table in a relational databse
    - differ from Python's ```pandas.DataFrame``` in that they represent distributed data sets on a cluster, instead of local data
- ```SparkSession``` is a wrapper around the ```SparkContext``` object
- can create a Data Frame from ```csv``` method on ```SparkSession```'s Reader API

In [15]:
val prev = spark.read.csv("linkage")

In [16]:
prev.show()

+--------------------+--------------------+--------------------+----+----+----+----+----+----+----+----+----+
|                 _c0|                 _c1|                 _c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9|_c10|_c11|
+--------------------+--------------------+--------------------+----+----+----+----+----+----+----+----+----+
|                PK|                null|                null|null|null|null|null|null|null|null|null|null|
|�5�_�}�%˄�C3���...|                null|                null|null|null|null|null|null|null|null|null|null|
|�����IHr���`oP9...|                null|                null|null|null|null|null|null|null|null|null|null|
|O�g������Z%����!L...|&9��>�tjm�U)�=��...|�.��s�겒�;��P$^-^...|null|null|null|null|null|null|null|null|null|
|     ϊa\?���b��6�>�|                null|                null|null|null|null|null|null|null|null|null|null|
|^k�ç}�۔HKlP2{��!...|                null|                null|null|null|null|null|null|null|null|null|null|
|         

- Spark can do some data processing while parsing, like inferring column names from a header, recognizing null values, and inferring the data types of each column


In [1]:
val parsed = spark.read.
    option("header", "true").
    option("nullValue", "?").
    option("inferSchema", "true").
    csv("linkage1")

In [2]:
parsed.show()

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|25739|45991|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|  

- we can examine the schema of the ```parsed``` Data Frame with ```printSchema```
    - each ```StructField``` contains the name of the column, the most specific data type that could handle the type of data contained in each record, a a boolean field that indicates whether a column may contain null values
    - to do this, Spark does _two_ passes over the data set: one pass to figure out column types, and a second pass to do the actual parsing
    - if schema is known in advance, can create instance of ```org.apache.spark.sql.types.StructType``` and pass to Reader API via ```schema``` function, possibly saving significant resources when the data set is very large

In [5]:
parsed.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



- through ```DataFrameReader``` and ```DataFrameWriter``` APIs, Spark supports reading and writing data frames in a variety of formats
    - _json_ - similar functionality to CSV format
    - _parquet_ and _orc_ - columnar-oriented binary file formats
    - _jdbc_ - connects to relational database via JDBC data connection standard
    - _libsvm_ - popular text file format for representing labeled observations with sparse features
    - _text_ - maps each line of a file to a data frame with a single column of type ```String```
- access ```DataFrameReader``` API through ```read``` method on a ```SparkSession``` instance
    - load data from file using either combination of ```format``` and ```load``` methods, or one of the shortcuts for built-in formats
- to write out data, access ```DataFrameWriter``` via ```write``` method on any DataFrame Instance
- Spark will throw error if you try to save data frame to file that already exists by default; control this behavior using ```SaveMode``` enum, with ```Overwrite```, ```Append```, and ```Ignore``` options

// val d1 = spark.read.format("json").load("file.json")
// val d2 = spark.read.json("file.json")

### Analyzing Data with the DataFrame API

- Every time we've processed the data set, Spark has re-opened the file, reparsed the rows, and then perform the action requested
- Instead of doing this, we can save the data in its parsed form on teh cluster
- we can accomplish via the ```cache``` method on the Data Frame instance
- ```cache``` call indicates that contents of DataFrame should be stored in memory the next time it's computed
    - so in this example, the call to ```count``` does the re-opening, reparsing, and action (counting)
    - the call to ```take``` accesses the cached data instead

In [11]:
parsed.cache()

[id_1: int, id_2: int ... 10 more fields]

In [12]:
parsed.count()

5749132

In [13]:
parsed.take(10)

Array([3148,8326,1.0,null,1.0,null,1,1,1,1,1,true], [14055,94934,1.0,null,1.0,null,1,1,1,1,1,true], [33948,34740,1.0,null,1.0,null,1,1,1,1,1,true], [946,71870,1.0,null,1.0,null,1,1,1,1,1,true], [64880,71676,1.0,null,1.0,null,1,1,1,1,1,true], [25739,45991,1.0,null,1.0,null,1,1,1,1,1,true], [62415,93584,1.0,null,1.0,null,1,1,1,1,0,true], [27995,31399,1.0,null,1.0,null,1,1,1,1,1,true], [4909,12238,1.0,null,1.0,null,1,1,1,1,1,true], [15161,16743,1.0,null,1.0,null,1,1,1,1,1,true])