In [101]:
import findspark
import os
from pyspark import SparkContext, SparkFiles, StorageLevel
from operator import add
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.serializers import MarshalSerializer

In [72]:
findspark.init("/Users/owner/spark")
work_dir = '/Users/owner/Documents/git/spark/examples'

# RDDs
RDD stands for Resilient Distributed Dataset. These are the elements that run and operate on multiple nodes to do parallel processing on a cluster. RDDs are immutable and are fault tolerant as well i.e. in case of failure they recover automatically. You can apply multiple operations on these RDDs to achieve a certain task.

There are two ways to apply operations on RDDs:
1. __Transformation__: These are the operations, which are applied on a RDD to create a new RDD. Filter, groupBy and map are the examples of transformations.
2. __Action__: These are the operations that are applied on RDD, which instructs Spark to perform computation and send the result back to the driver.

In [73]:
sc.stop()
sc = SparkContext(appName="examples")

Let's create an RDD.

In [74]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

### count
Number of elements in the RDD is returned.

In [75]:
counts = words.count()
print(f'Number of elements in RDD -> {counts}')

Number of elements in RDD -> 8


### collect
All the elements in the RDD are returned.

In [76]:
coll = words.collect()
print(f'Elements in RDD -> {coll}')

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


### filter
A new RDD is returned containing the elements, which satisfies the function inside the filter. In the following example, we filter out the strings containing ''spark".

In [77]:
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print(f'Filtered RDD -> {filtered}')

Filtered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


### map
A new RDD is returned by applying a function to each element in the RDD. In the following example, we form a key value pair and map every string with a value of 1.

In [78]:
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print(f'Key value pair -> {mapping}')

Key value pair -> [('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1)]


### cache
Persist this RDD with the default storage level (MEMORY_ONLY). You can also check if the RDD is cached or not.

In [79]:
words.cache() 
caching = words.persist().is_cached 
print(f'Words got chached -> {caching}')

Words got chached -> True


### reduce
After performing the specified commutative and associative binary operation, the element in the RDD is returned. In the following example, we are importing add package from the operator and applying it on ‘num’ to carry out a simple addition operation.

In [80]:
sc.stop()
sc = SparkContext("local", "Reduce_app")

In [81]:
nums = sc.parallelize([1, 2, 3, 4, 5])

In [82]:
adding = nums.reduce(add)
print(f'Adding all the elements -> {adding}')

Adding all the elements -> 15


### join
It returns RDD with a pair of elements with the matching keys and all the values for that particular key. In the following example, there are two pair of elements in two different RDDs. After joining these two RDDs, we get an RDD with elements having matching keys and their values.

In [83]:
sc.stop()
sc = SparkContext("local", "Join_app")

In [84]:
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])

In [85]:
joined = x.join(y)
final = joined.collect()
print(f'Join RDD -> {final}')

Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]


# Parallel Processing
For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks.

There are two types of shared variables supported by Apache Spark: __Broadcast__ and __Accumulator__.

## Broadcast
Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks. 

In [86]:
sc.stop()
sc = SparkContext("local", "Broadcast_app") 

In [87]:
words_new = sc.broadcast(
    ["scala", 
     "java", 
     "hadoop", 
     "spark", 
     "akka"]
) 

In [88]:
data = words_new.value 
elem = words_new.value[2] 
print(f'Stored data -> {data}')
print(f'Printing a particular element in RDD -> {elem}')

Stored data -> ['scala', 'java', 'hadoop', 'spark', 'akka']
Printing a particular element in RDD -> hadoop


## Accumulator
Accumulator variables are used for aggregating the information through associative and commutative operations. For example, you can use an accumulator for a sum operation or counters (in MapReduce).

In [89]:
sc.stop()
sc = SparkContext("local", "Accumulator app") 

In [90]:
num = sc.accumulator(10) 

In [91]:
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print(f'Accumulated value is -> {final}')

Accumulated value is -> 150


# Files
In Apache Spark, you can upload your files using `sc.addFile` and get the path on a worker using `SparkFiles.get`. Thus, SparkFiles resolve the paths to files added through SparkContext.addFile().

SparkFiles contain the following classmethods
1. `get(filename)`
2. `getrootdirectory()`

In [92]:
sc.stop()
sc = SparkContext("local", "SparkFile_App")

In [93]:
file_to_add='intro_to_spark.ipynb'
testfile = f'{work_dir}/{file_to_add}'
sc.addFile(testfile)
print(f'Absolute Path -> {SparkFiles.get(file_to_add)}')

Absolute Path -> /private/var/folders/0n/z4c7t4dd59zdjk155nrygs2r0000gn/T/spark-9e39d9ab-c9c3-4376-a5ca-2a696ad27497/userFiles-65b6676d-a1e7-4c35-8b8b-fe020185a9c0/intro_to_spark.ipynb


If you go to http://localhost:4040/environment/ then you'll be able to see your added file.

# StorageLevel
`StorageLevel` specifies how RDDs should be stored: in memory, disk, or both. It also specifies whether to serialize RDDs and whether to replicate RDD partitions.

The following code block has the class definition of a `StorageLevel`:

`class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)`

Now, to decide the storage of RDD, there are different storage levels, which are given below:

- __DISK_ONLY__: `StorageLevel(True, False, False, False, 1)`
- __DISK_ONLY_2__: `StorageLevel(True, False, False, False, 2)`
- __MEMORY_AND_DISK__: `StorageLevel(True, True, False, False, 1)`
- __MEMORY_AND_DISK_2__: `StorageLevel(True, True, False, False, 2)`
- __MEMORY_AND_DISK_SER__: `StorageLevel(True, True, False, False, 1)`
- __MEMORY_AND_DISK_SER_2__: `StorageLevel(True, True, False, False, 2)`
- __MEMORY_ONLY__: `StorageLevel(False, True, False, False, 1)`
- __MEMORY_ONLY_2__: `StorageLevel(False, True, False, False, 2)`
- __MEMORY_ONLY_SER__: `StorageLevel(False, True, False, False, 1)`
- __MEMORY_ONLY_SER_2__: `StorageLevel(False, True, False, False, 2)`
- __OFF_HEAP__: `StorageLevel(True, True, True, False, 1)`

Let us consider the following example of StorageLevel, where we use the storage level __MEMORY_AND_DISK_2__, which means RDD partitions will have replication of 2.

In [94]:
sc.stop()
sc = SparkContext (
   "local", 
   "storagelevel app"
)

In [95]:
rdd1 = sc.parallelize([1,2])

In [97]:
rdd1.persist(StorageLevel.MEMORY_AND_DISK_2)
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())

Disk Memory Serialized 2x Replicated


# MLlib
Apache Spark offers a Machine Learning API called MLlib. PySpark has this machine learning API in Python as well. Here are some of its offerings:
- __mllib.classification__: The spark.mllib package supports various methods for binary classification, multiclass classification and regression analysis. Some of the most popular algorithms in classification are Random Forest, Naive Bayes, Decision Tree, etc.
- __mllib.clustering__: Clustering is an unsupervised learning problem, whereby you aim to group subsets of entities with one another based on some notion of similarity.
- __mllib.fpm__: Frequent pattern matching is mining frequent items, itemsets, subsequences or other substructures that are usually among the first steps to analyze a large-scale dataset. This has been an active research topic in data mining for years.
- __mllib.linalg__: MLlib utilities for linear algebra.
- __mllib.recommendation__: Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user item association matrix.
- __spark.mllib__: It currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.mllib uses the Alternating Least Squares (ALS) algorithm to learn these latent factors.
- __mllib.regression__: Linear regression belongs to the family of regression algorithms. The goal of regression is to find relationships and dependencies between variables. The interface for working with linear regression models and model summaries is similar to the logistic regression case.

There are other algorithms, classes and functions also as a part of the mllib package. As of now, let us understand a demonstration on pyspark.mllib.

In [98]:
sc.stop()
sc = SparkContext(appName="Pyspark_mllib_Example")

In [102]:
data = sc.textFile('test_data.txt')
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on test data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print(f'Mean Squared Error = {MSE}')

# Save and load model
if not os.path.isdir(f'{work_dir}/myCollaborativeFilter'):
    model.save(sc, f'{work_dir}/myCollaborativeFilter')
sameModel = MatrixFactorizationModel.load(sc, f'{work_dir}/myCollaborativeFilter')

Mean Squared Error = 8.338085482353445e-06


# Serializers
Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to disk or persisted in the memory should be serialized for improved performance. Serialization plays an important role in costly operations.

PySpark supports custom serializers for performance tuning. The following two serializers are supported by PySpark:
- __MarshalSerializer__: Serializes objects using Python’s Marshal Serializer. This serializer is faster than PickleSerializer, but supports fewer datatypes.
- __PickleSerializer__: Serializes objects using Python’s Pickle Serializer. This serializer supports nearly any Python object, but may not be as fast as more specialized serializers.

In [103]:
sc.stop()
sc = SparkContext("local", 
                  "serialization app", 
                  serializer = MarshalSerializer()
                 )

In [104]:
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [105]:
sc.stop()