## Resilient Distributed Data Sets   and DataFrames
### 1. What is RDD    
** Primary abstraction of spark. Basically a dataset partitioned across cluster of machines. Defined as a fault tolerant collection of elements that can be operated on in parallel, they are also immutable **  
### 2. How can they be created    
** Three methods ** -   
1. parallelizing data in spark, meaning distributing it across machines. Parallelizing is an operation that returns a pointer.  
2. Reading from any storage supported by hadoop  
  - Cassandra  
  - HBase  
  - HDFS  
  - Amazon S3 etc  
Multiple types of files can be read -
  - text, sequence, hadoop input format   
Reading from any of these sources creates an RDD and a pointer is returned.  
3. From other RDDs,  when a transformation operation is performed  

#### 3. What happens when an RDD is created   
A DAG is created when an RDD is created
#### 4. What Operations can be peformed on them    
Transformations - These update the DAG  , and return a pointer to the RDD to be created, but not the value  
Action - The DAG is evaluated when an action is called and return a value


### 3. Scala : Creating and working with RDD

The location where spark is installed is noted by environment variable - $SPARK_HOME. Launch spark sheel from SPARK_HOME/bin  
.bin/spark_shell  

In [None]:
# Create some data 
val data = 1 to 10000     # val keyword is used to declare a value carrying object
# praellize the data and create an RDD    
val distData = sc.parallelize(data)  # sc is available in the environment
# Perform a transformation 
distData.filter()

In [None]:
# Another way is load a file 
val data = sc.textFile("file.txt")

In [None]:
# Loading from hdfs 
val data = sc.textFile("hdfs://lines.txt") 
# Apply transformation 
val llength = data.map(line => line.length)
# Invoke action 
val totallth = llength.reduce((a,b) => a+b)  # function arguments with a binary operation is stated as (a,b) 
followed by definiton


#### Word Count example

In [None]:
val words = data.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((a,b) => a+ b)

words.collect()

### 4. Direct Acyclic Graph

In [None]:
# Ater transformations have been applied on an RDD, the DAG can be viewed. An example of available method in Scala is 
# toDebugString:
val rdd1 = readme.flatMap(line => line.split(" ")) 
val rdd2 = rdd1.map(wrd => (wrd, 1))
val rdd3 = rdd2.reduceByKey((a,b) => a+b)
rdd3.toDebugString

# In Python DAG can be seen as
rdd1 = logFile.filter(lambda line : 'INFO' in line )
rdd2 = rdd1.flatMap(lambda line : line.split(' ') )
rdd3 = rdd2.map(lambda wrd : (wrd, 1))
rdd4 = rdd3.reduceByKey(sum)
rdd3.toDebugString()

In [2]:
Example:
# (2) ShuffledRDD[30] at reduceByKey at <console>:29 []
# +-(2) MapPartitionsRDD[29] at map at <console>:27 []
#    |  MapPartitionsRDD[28] at flatMap at <console>:25 []
#    |  MapPartitionsRDD[2] at textFile at <console>:23 []
#   |  /resources/jupyter/labs/BD0211EN/LabData/README.md HadoopRDD[1] at textFile at <console>:23 []
    

DAG when read from bottom to top, above shows transformations starting from when the text file is read, followed by two map, and a reduceby Key transformation. 
** Falult tolerance is enable on node failure,  by the behavior of copying over this DAG from another node and executing the DAG till the point of failure **

### Step by Step execution of DAG  with an example of log file analysis

#### 1. When the file is first read it is partitioned across nodes in the cluster

In [None]:
# Count occurences of 'insecure' and 'authentication' in a log file with warning messages
val logFile = sc.textFile("notebook.log")


#### 2. The driver send the below transformations and actions to be performed to each node. The exectors on each node *read the data from the node* and perform the tasks in parallel. After performing the tasks, the results are returned to the driver

In [None]:
val warn = logFile.filter(line => line.contains("WARNING") )
val tokens = warn.flatMap(line => line.split(" "))

##### 3. Caching - another transformation

In [None]:
# Caching
tokens.cache()

#### 4. Action 1 - After completion of first action, the RDD tokens is cached 

In [None]:
# Action 1
tokens.filter(word => word == "insecure").count()

#### 5. Action 2 - this action uses the cached RDD now, and is much faster

In [None]:
# Action 2
tokens.filter(word => word == "encryption").count()

### 5. RDD persistence 

Spark allows RDD partitions and partitions created by transformations on the nodes to be stored in memory, thereby making any 
later computations to be fast (almost 10X). An important feature that must be leveraged for an iterative work.

Two methods - 
1. perist()  
2. cache() -  which is persist with MEMORY_ONLY storage   

STORAGE LEVEL--------------  Meaning
MEMORY_ONLY       ---       Persist RDD in memory only  
MEMORY_AND_DISK   ---       Spill over to disk only if necessary, and then read fro disk when an action is required  
MEMORY_ONLY_SER   ---       Memory only, but save as serialized java object, which take less space, but need derialization and 
                            increase CPU time  
MEMORY_AND_DISK_SER ---      Serialized   
DISK_ONLY           ---      Disk only option  
MEMORY_ONLY_2,DISK_ONLY_2,      ---     Make a copy on two nodes, persist in memory or disk resp.      
OFF HEAP            ---    Option of executors using shared memory   

If RDD is small and likely to fit in memory, use cache()  
If not, try to use serialization option, but with a fast serializer   
Try not to spill to disk, unless expensive computations are being used   
Tachyon is a good option, when your environment has multiple application and high memory requirements.  

### 6. Shared Variables

Two types of shared variables are available in spark  
1. Broadcast variables    
usually separate set of variables are used in each node for performing computations. Broadcast variables are useful in passing  
something 'read only' information to worker nodes, these variable are immutable.
2. Accumulators  
  - Are variables that are passed to worker nodes, and can be added to by worker nodes.  
  - Only driver can read the accumulator values.  
  - These are used to implement counters and sums.  
  - Spark natively supports numeric types, but other types can be defined

In [None]:
# Creat a broadcast variable
val bcVar = sc.broadcast(Array(1,2,3,4))
# Access value
bcVar.value

## Python
broadcastVar = sc.broadcast([1,2,3])
broadcastVar.value


In [None]:
# Create an Accumulator var
val acc = sc.accumulator(0)
acc.value

## Python
accum = sc.accumulator(0)

In [None]:
# Create an RDD and try performing associative operation to add to accumulator
rdd = sc.parallelize(Array(1,2,3,4))
rdd.foreach(x => acc+=x)    # accumulator variable was accesable indide function foreach
# Check acc value
acc.value

## Python 
rdd = sc.parallelize([1,2,3,4])
def f(x):                   # to access variable accum, have to define it as global inside the function
    global accum
    accum += x

rdd.foreach(f)  

### 7. Key Values Pairs and Programming with them

 ** There are special operations available for RDDs with key value pairs . ** 
- **Common operations are grouping and aggregating by keys, like reduceByKey**   
- **Require using Tuple2 objects which can be created by notation (a,b) in scala, but require importing spark context library as 
  import org.apache.spark.SparkContext._  **    
- ** Pair RDD function contain key value operations like reduceByKey((a,b) => a+b) **    
- ** Custom objects as key-value pairs, require a custom methods (equals() method with a matching hashCode() method?) **  
 Key Value pairs in Scala  
val pair = ('a', 'b'), Element accessed like a._1 and a._2    
 In Python     
pair = ('a','b') , Eg: accessed using a[0], a[1]  

In [None]:
val rdd1 = readme.flatMap(line => line.split(" "))   
val rdd2 = rdd1.map(wrd => (wrd, 1))  # created a RDD of key value pairs   
val rdd3 = rdd2.reduceByKey(_+_) # enable using Paired RDD operation, _+_ is a shorthand of the sum operation to be done on
values  

In [None]:
################# Example with NYC dataset #######################
val taxi = sc.textFile("/resources/jupyter/labs/BD0211EN/LabData/nyctaxi.csv")
val taxiParse = taxi.map(line=>line.split(","))
val taxiMedKey = taxiParse.map(vals=>(vals(7), 1))  # Create a paired RDD to later group on medallion and count cars by 
# medallion number
val taxiMedCounts = taxiMedKey.reduceByKey((v1,v2)=>v1+v2)

# Swap the values in tuples to later present the results in sorted
for (pair <-taxiMedCounts.map(_.swap).top(10)) println("Taxi Medallion %s had %s Trips".format(pair._2, pair._1))

# Note ths shorthand of using - (.swap) instead of (  x => x.swap())

### 8. Joining RDDs

In [None]:
# Read , filter one file , make a paired RDD
## Scala
val readmeCount = readmeFile.filter(line => line.contains("Spark")).
flatMap(line => line.split(" ")).
map(wrd => (wrd,1)).
reduceByKey((a,b) => a+b)
readmeCount.count()

## Python
readmeCount = readmeFile.                    \
    flatMap(lambda line: line.split("   ")).   \
    map(lambda word: (word, 1)).             \
    reduceByKey(lambda a, b: a + b)

In [None]:
# Repeat for second RDD
## Scala
val pomCount = pom.filter(line => line.contains("Spark")).
flatMap(line => line.split(" ")).
map(wrd => (wrd,1)).
reduceByKey((a,b) => a+b)
pomCount.count()

## Python 
pomCount = pomFile.                          \
    flatMap(lambda line: line.split("   ")).   \
    map(lambda word: (word, 1)).            \
    reduceByKey(lambda a, b: a + b)

In [None]:
# Do an inner join 
## Scala
val joined = readmeCount.join(pomCount)

## Python
joined = readmeCount.join(pomCount)

In [None]:
# Count 'Spark' keyword in both and print
## Scala
joined.map(a => (a._1,(a._2)._1 + (a._2)._2) ).
collect.foreach(println)

## Python
joinedSum = joined.map(lambda k: (k[0], (k[1][0]+k[1][1])))

# DataFrame using Python API

** Explore : **    
    1. How to create a Spark DataFrame  
    2. Perform group by and aggregation operations   
    3. Running SQL queries on spark data frame 

In python Pandas library provides data frame structure, and ways to create and manipulte data frames

In [None]:
# Creating Spark Data Frame required creating a SQLcontect from Spark Context
sqlcontext = SQLContext(sc)

In [None]:
# Create a spark dataframe using a pandas data frame, and methods available in sqlcontext
sdf = sqlcontext.CreateDataFrame(mtcars)
# Describe the frame
sdf.printSchema()


In [None]:
# Explore sample rows -  show() method
sdf.show(5)

In [None]:
# Selecting columns - select method
sdf.select('mpg').show(5)

In [None]:
# Filtering using filter method 
sdf.filter(sdf['mpg'] < 10). showd(5)        # Indexing usng [] works as for pandas df

In [None]:
# Creating a new column - using withColumn()
sdf2 = sdf.withColumn('wtTon', sdf['wt'] * 0.45) # returns a new spark dataframe, original is not modified as in python

In [None]:
# Groupig and Aggregation
sdf3 = sdf.groupby(['cyl'])\
.agg({"wt": "AVG",
     "mpg" : 'SUM'})\
.show(5)             # Dictionary inside agg method to specify aggregation on column, and method
sdf3.show(5)

In [None]:
# Sorting 
sdf3.sort('count(wt)', ascending =  False).show(5)

#### Running SQL Queries

In [None]:
#1. Register data frame as a table using registerTempTable method
cars = sdf.registerTempTable("cars")
# 2. # SQL statements can be run by using the sql method
highgearcars = sqlcontext.sql("SELECT gear FROM cars WHERE cyl >= 4 AND cyl <= 9")
highgearcars.show(6) 