<img src="spark.png" alt="drawing" width="200"/>

# Introduction Pyspark




In [91]:
from pyspark import SparkConf 
from pyspark.context import SparkContext 

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]").setAppName("EXAMPLES RDD"))

# Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

## Parallelized collection

Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create two parallelized collection: one for holding the numbers 1 to 5 and the other for hoding a String.

In [92]:
numRDD = sc.parallelize([1,2,3,4,5])
print ("numRDD:   ", type(numRDD)) #confirm type of object RDD

helloRDD = sc.parallelize(("Hello world"))
print ("helloRDD: ",type(helloRDD)) #confirm type of object RDD

numRDD:    <class 'pyspark.rdd.RDD'>
helloRDD:  <class 'pyspark.rdd.RDD'>


## External Datasets

PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created using SparkContext’s textFile method. This method takes a URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. Here is an example invocation:

In [108]:
fileRDD = sc.textFile("ratings.csv")

newRDD= fileRDD.take(3)

for i in newRDD:
    print(i)


userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817


 ## RDD Operations
 
 RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

 ### Transformations

#### map( ) - Return a new RDD by applying a function to each element of this RDD.

In [48]:
RDD = sc.parallelize([1,2,3,4,5])
RDD_map = RDD.map(lambda x : x * 2)
print ("RDD_map: ",RDD_map.collect()) # action convert to a  List

RDD_map:  [2, 4, 6, 8, 10]


#### filter( ) returns a new RDD with only the elements that pass the condition

In [49]:
RDD = sc.parallelize([1,2,3,4])
RDD_filter = RDD.filter(lambda x : x >2)
print ("RDD_filter: ", RDD_filter.collect()) # action convert to a  List

RDD_filter:  [3, 4]


#### flatMap( ) returns multiple values for each element in the original RDD

In [50]:
RDD = sc.parallelize(["hello word", "How are you"])
RDD_flatMap = RDD.flatMap(lambda x : x.split(" "))
print ("RDD_flatMap: ", RDD_flatMap.collect()) # action convert to a  List

RDD_flatMap:  ['hello', 'word', 'How', 'are', 'you']


#### union( ) Return the union of this RDD and another one

In [51]:
rdd01 = sc.parallelize([1, 3, 5, 7])
rdd02 = sc.parallelize([2, 4, 6, 8])
rdd03 = rdd01.union(rdd02)
rdd03.collect()

[1, 3, 5, 7, 2, 4, 6, 8]

In [43]:
# Print the file_path
file_path = "/content/rating.cvs"
print("The file_path is", file_path)

# Create a fileRDD from file_path
fileRDD = sc.textFile(file_path)

# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))

The file_path is /content/rating.cvs
The file type of fileRDD is <class 'pyspark.rdd.RDD'>


In [44]:
# Check the number of partitions in fileRDD
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())

# Create a fileRDD_part from file_path with 5 partitions
fileRDD_part = sc.textFile(file_path, minPartitions = 5)

# Check the number of partitions in fileRDD_part
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())

Py4JJavaError: ignored

**Transformations** create a new dataset from an existing one

### Actions 

#### collection ( ) Return a list that contains all of the elements in this RDD.
Note: This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

In [52]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

newData = rdd.collect()
for d in newData:
    print (f"Value: {d}")


Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12


#### take(num) – Take the first num elements of the RDD.

In [53]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

newData = rdd.take(2)
for d in newData:
    print (f"Value: {d}")

Value: 1
Value: 2


#### first( ) – Returns the first record of the RDD

In [54]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

newData = rdd.first()
print (f"Value: {newData}")

Value: 1


#### count( ) – Returns the number of records in an RDD

In [55]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

num = rdd.count()
print (f"Count: {num}")

Count: 12


#### max( ) – Returns max record

In [56]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

num = rdd.max()
print (f"Max: {num}")

Max: 12


#### reduce( ) – Reduces the records to single, we can use this to count or sum.

In [57]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd  = sc.parallelize(data)

num = rdd.reduce(lambda a,b: (a+b))
print (f"Max: {num}")

Max: 78


## Pair RDDs

Spark Paired RDDs are RDDs containing a key-value pair. Key-value pair (KVP) consists of a two linked data item in it. Here, the key is the identifier, whereas value is the data corresponding to the key value.

### Creating Pair RDDs

Two common ways to create pair RDD:
 * From a list of key-value tuples
 * from a regular RDD

#### Create a Pair RDD from regular RDD

In [75]:

rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())

[('a', 1), ('b', 1), ('c', 1)]

#### Create a Pair RDD from a list

In [76]:
rdd = sc.parallelize([(1,"a"), (2,"b"), (3,"c")])
rdd.collect()

[(1, 'a'), (2, 'b'), (3, 'c')]

### Transformations on pair RDDs
All regular transformations work on pair RDD. Have to pass functions that operate on key value pairs rather than on individual elements

#### reduceByKey(fun) - groups all the values with the same key.

In [77]:
rdd = sc.parallelize([("a",1), ("b",2), ("c", 10),("a", 2), ("d", 5), ("a", 4) ])
rdd_reduceByKey = rdd.reduceByKey(lambda x, y: x+y )
rdd_reduceByKey.collect()

[('b', 2), ('c', 10), ('a', 7), ('d', 5)]

#### sortByKey(fun) - Order RDD pair by key.

In [78]:
rdd = sc.parallelize([("a",1), ("c",2), ("b", 10),("a", 2), ("d", 5), ("a", 4) ])
rdd_reduceByKey = rdd.reduceByKey(lambda x, y: x+y )
rdd_reduceByKey.sortByKey(ascending = True).collect()

[('a', 7), ('b', 10), ('c', 2), ('d', 5)]

#### groupByKey( ) - Groups all the values with the same key in the pair 

In [79]:
rdd = sc.parallelize([("a",1), ("c",2), ("b", 10),("a", 2), ("d", 5), ("a", 4) ])
rdd_groupByKey = rdd.groupByKey().collect()
for letter, value in  rdd_groupByKey:
    print (letter, list(value))

c [2]
b [10]
a [1, 2, 4]
d [5]


#### join( ) - transformation joins the two pair RDDs based on their key

In [80]:
rdd01 = sc.parallelize([("a",1), ("b", 5),("c", 7) ])
rdd02 = sc.parallelize([("a",2), ("b", 3),("d", 4) ])

rdd01. join(rdd02).collect()


[('a', (1, 2)), ('b', (5, 3))]

#### countByKey( ) - action counts the number of elements for each key

In [87]:
rdd = sc.parallelize([("a",2), ("b", 4),("a", 3) ])
for key, val in  rdd.countByKey().items():
    print (key, val)

a 2
b 1


#### collectAsMap( ) - action return the key-value pairs in the RDD as a dictionary

In [90]:
rdd = sc.parallelize([("a",2), ("b", 4),("c", 3) ])
rdd.collectAsMap()

{'a': 2, 'b': 4, 'c': 3}