In [1]:
import findspark
findspark.init() 

In [2]:
from pyspark import SparkContext,SparkConf

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

In [3]:
conf = SparkConf().setAppName("myapp").setMaster("local")
sc = SparkContext(conf=conf)

Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:

In [4]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list.

In [5]:
distData.reduce(lambda a, b: a + b) 

15

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster.

### External Datasets

PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created using SparkContext’s textFile method. This method takes a URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines.

In [6]:
distFile = sc.textFile("Puppy.txt")

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: 

In [7]:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

133

By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

### Sequence Files

##### PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the resulting Java objects using Pyrolite. When saving an RDD of key-value pairs to SequenceFile, PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables.

In [11]:
import os
os.getcwd()

'C:\\Users\\Suraj\\PySpark'

In [12]:
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
rdd.saveAsSequenceFile("C:\\Users\\Suraj\\PySpark\\dummy.txt")
sorted(sc.sequenceFile("C:\\Users\\Suraj\\PySpark\\dummy.txt").collect())

[(1, 'a'), (2, 'aa'), (3, 'aaa')]

#### RDD Operations

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

#### Persist

#### If we also wanted to use lineLengths again later, we could add persist before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed

In [13]:
lines = sc.textFile("Puppy.txt")
lineLengths = lines.map(lambda s: len(s))
lineLengths.persist()
totalLength = lineLengths.reduce(lambda a, b: a + b)

In [14]:
lineLengths

PythonRDD[20] at RDD at PythonRDD.scala:53

In [15]:
totalLength

133

### Passing Functions to Spark

In [18]:
def myFunc(s):
    words = s.split(" ")
    return len(words)

lenvar = sc.textFile("Puppy.txt").map(myFunc).collect()

In [19]:
lenvar

[4, 4, 4, 5, 7]

#### Closures and Accumulators

#### When we write a spark function and try to modify a variable that is not in the scope of the function it is called closure

In [20]:
counter = 0
rdd = sc.parallelize([1,2,3,4,5])

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

Counter value:  0
