NOTE: Remember to use the reco-pyspark environment!

# Spark
- fast real-time processing framework for big data application
- does in-memory computations to analyse data in real time (improvement from earlier Apache Hadoop MapReduce which perform only batch processing. 
- Apache Spark can perform stream processing in real time and also batch process
- Supports interactive queries and iterative algorithms
- Has its own cluster manager to host its application
- Uses HDFS (Hadoop Distributed File System) for storage
- At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. 

# PySpark
- Apache Spark is written in Scala

## RDD
- The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. 
- RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. 

## PySpark - SparkContext
- entry point to any spark functionality
- tells Spark how to access a Spark cluster
- To create a SparkContext you first need to build a SparkConf object that contains information about your application.

In [56]:
import sys
import pyspark
from pyspark import SparkConf, SparkContext

In [2]:
print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.6.10 |Anaconda, Inc.| (default, Mar 25 2020, 23:51:54) 
[GCC 7.3.0]
Spark version: 2.3.1


In [64]:
conf = SparkConf().setAppName("First App").setMaster("local")
sc = SparkContext(conf=conf)

- appName: a name to show on the cluster UI
- master: a Spark, Mesos or YARN cluster URL, or a special "local" string.

## Resilient Distributed Datasets (RDDs)
- fault-tolerant collection of elements that can be operated on in parallel.
- Two ways to create RDDs:
 - parallelizing an existing collection in your driver program, or 
 - referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

### Parallelised Collections
- Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.

In [65]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

- Once created, the distributed dataset (distData) can be operated on in parallel. 

In [66]:
# Add up elements using reduce
distData.reduce(lambda a, b: a + b) 

15

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)).

### External Datasets
PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

- textFile method create text file RDDs
 - Takes in file location
 - reads it as a collection of lines

In [70]:
distFile = sc.textFile("README.md").cache()

Once created, distFile can be acted on by dataset operations. 
- add up the sizes of all lines using the map and reduce operations
 - Reduce: Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. 


In [71]:
lineLengths = distFile.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

In [73]:
lineLengths

PythonRDD[7] at RDD at PythonRDD.scala:49

In [72]:
totalLength

4456

- filter: Return a new dataset formed by selecting those elements of the source on which func returns true
- count: Return the number of elements in the dataset

In [60]:
numAs = distFile.filter(lambda s: 'a' in s).count()
numBs = distFile.filter(lambda s: 'b' in s).count()

In [61]:
print ("Lines with a:{}, lines with b:{}".format(numAs, numBs))

Lines with a:14, lines with b:12


In [62]:
sc.stop()

### SparkFiles resolves file paths

In [17]:
from pyspark import SparkFiles
path = "test.txt"

In [18]:
with open(path, "w") as testFile:
   _ = testFile.write("100")

In [25]:
sc_2 = SparkContext(master = "local", appName="Second App")

In [26]:
sc_2.addFile(path)

In [27]:
def func(iterator):
    with open(SparkFiles.get("test.txt")) as testFile:
        fileVal = int(testFile.readline())
        return [x * fileVal for x in iterator]

- parallelize: distribute a local Python collection to form an RDD
- map:Return a new distributed dataset formed by passing each element of the source through a function func. 
- mapPartitions: Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator< T > => Iterator< U > when running on an RDD of type T. 
- collect(): Return all the elements of the dataset as an array at the driver program.

In [28]:
sc_2.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()

[100, 200, 300, 400]

In [29]:
sc_2.stop()

### RDD Operations
RDDs support two types of operations: 
- transformations, which create a new dataset from an existing one, and
 - e.g. map passes dataset through a function and returns a new RDD
- actions, which return a value to the driver program after running a computation on the dataset. 
 - e.g.reduce aggregates all elements of the RDD and returns the final result to the driver program
 
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. 

### Passing Function to Spark
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are three recommended ways to do this:
- Lambda expressions, for simple functions that can be written as an expression. (Lambdas do not support multi-statement functions or statements that do not return a value.)
- Local defs inside the function calling into Spark, for longer code.
- Top-level functions in a module.


In [None]:
# more to learn
# https://spark.apache.org/docs/latest/rdd-programming-guide.html

# References
- https://www.tutorialspoint.com/pyspark/pyspark_quick_guide.htm
- https://spark.apache.org/docs/latest/api/python/pyspark.html
- https://spark.apache.org/docs/latest/rdd-programming-guide.html
- https://www.youtube.com/watch?v=XrpSRCwISdk&t=20s Pyspark for Data Scientists
- https://www.youtube.com/watch?v=5dARTeE6OpU PySpark tutorial