### AST 4: PySpark Transform and Actions

## Learning Objectives

At the end of the experiment, you will be able to

* Perform RDD (Resilient Distributed Datasets) operations including:
        
  1.   Transformations
  2.   Actions

* Obtain an overview of shuffle operations
* Implement RDD based model


## Information

**Overview about Spark, PySpark and Apache Spark in simple language**

**Spark:** A data computational framework that handles Big data.

**PySpark:** A tool to support Python with Spark

**Apache Spark:** It is an open-source cluster-computing framework, built around speed, ease of use, and streaming analytics.

* Like Spark, PySpark helps data scientists to work with (RDDs) Resilient Distributed Datasets. It is also used to work on Data frames. PySpark can be used to work with machine learning algorithms as well.

### ***Spark RDD is a major concept in Apache Spark***

**Resilient Distributed Datasets:**

**Resilient:**    because RDDs are immutable (can’t be modified once created)                        and fault tolerant.

**Distributed:**  because it is distributed across clusters

**Dataset:**      because it holds data.

**Why RDD?**

* Apache Spark lets you treat your input files almost like any other variable, which you cannot do in Hadoop MapReduce.
* RDDs are automatically distributed across the network by means of Partitions.

RDDs are divided into smaller chunks called Partitions, and when you execute some action, a task is launched per partition. This means, the more the number of partitions, the more will be the parallelism.

Spark automatically decides the number of partitions that an RDD has to be divided into, but you can also specify the number of partitions when creating an RDD. These partitions of an RDD are distributed across all the nodes in the network.

**Difference between Dataframe and RDD (Resilient Distributed Datasets):**

**Dataframe:**
* Automatically finds out the schema of the dataset.
* Performs aggregation faster than RDDs, as it provides an easy API to perform aggregation operations.

**RDD:**
* We need to define the schema manually.
* RDD is slower than Dataframes to perform simple operations like grouping the data.


**Creating an RDD**

**There are three ways to create an RDD in Spark:**
1. Parallelizing already existing collection in the driver program.

  The key point to note in a parallelized collection is the number of partitions the dataset is divided into. Spark will run one task for each partition of the cluster. We require two to four partitions for each CPU in the cluster. Spark sets the number of partition based on our cluster.

2. Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system).
  
  In Spark, the distributed dataset can be formed from any data source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase etc. In this, the data is loaded from the external dataset.

  * csv (String path): It loads a CSV file and returns the result as a Dataset.

  * json (String path): It loads a JSON file (one object per line) and returns the result as a Dataset

  * textFile (String path) It loads text files and returns a Dataset of String.

3. Creating RDD from already existing RDDs.

  Transformation mutates one RDD into another RDD, this transformation is the way to create an RDD from an already existing RDD. This creates a difference between Apache Spark and Hadoop MapReduce.

**Actions/Transformations**

There are two types of operations that you can perform on an RDD-
* Transformations
* Actions.

**Transformation** applies some function on an RDD and creates a new RDD, it does not modify the RDD that you apply the function on. Also, the new RDD keeps a pointer to its parent RDD.

When you call a transformation, Spark does not execute it immediately, instead it creates a lineage. A lineage keeps track of what all transformations have to be applied on that RDD, including from where it has to read the data.


**Action** is used to either save the result to some location or to display it. You can also print the RDD lineage information by using the command:

"filtered.toDebugString" -> (*filtered* is the RDD here).

<img style="-webkit-user-select: none;margin: auto;" src="https://cdn.iisc.talentsprint.com/CDS/Images/Pyspark_RDD.JPG" width="500" height="400">


### Setup Steps:

 **Install PySpark**

In [None]:
!pip install pyspark

**Creating Spark Session**

Spark session is a combined entry point of a Spark application, which came into implementation from Spark 2.0 (Instead of having various contexts, everything is encapsulated in a Spark session)

In [None]:
# Start spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf  # User Defined Functions
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName('Rdd').getOrCreate()
spark

In [None]:
# Accessing sparkContext from sparkSession instance.
sc = spark.sparkContext

### Spark Python Transformations

**map()** - A map transformation is useful when we need to transform an RDD by applying a function to each element.

In [None]:
# Return a new RDD by applying a function to each element of this RDD.
rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())

**take()** - Take the first num elements of the RDD.

It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit

In [None]:
sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) #take()

In [None]:
sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) #take()

**flatMap()** - The flatMap transformation will return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. This is the main difference between the flatMap and *map transformations.*

In [None]:
s0 = sc.parallelize([3,4,5])
s0.flatMap(lambda x: [x, x*x]).collect()

Compare the same function using map()

In [None]:
sc.parallelize([3,4,5]).map(lambda x: [x,  x*x]).collect()

**filter()** - The filter transformation returns a new dataset formed by selecting  those elements of the source on which func returns true.

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect() # Return a new RDD containing only the elements that satisfy a predicate.

**groupByKey()** - We can apply the “groupByKey” transformations on (key,val) pair RDD. The “groupByKey” will group the values for each key in the original RDD. It will create a new pair, where the original key corresponds to this collected group of values.

In [None]:
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
x.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()

**reduceByKey()** - Merge the values for each key using an associative reduce function.

In [None]:
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())

**mapPartitions()** - Is similar to map, but runs separately on each partition (block) of the RDD

In [None]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']

wordsRDD = sc.parallelize(wordsList, 4) # number of partitions - 4

print(wordsRDD.collect())

itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)])
# mapPartitions() loops through 4 partitions and combines('rat,cat') in 4th iteration.
print (itemsRDD.collect())

In [None]:
L = range(1,10)

parallel = sc.parallelize(L, 3) # number of partitions - 3

def f(iterator):
  yield sum(iterator)

parallel.mapPartitions(f).collect()

# Results [6,15,24] are created because mapPartitions() loops through 3 partitions, Partion 1: 1+2+3 = 6, Partition 2: 4+5+6 = 15, Partition 3: 7+8+9 = 24


In [None]:
rdd = sc.parallelize([1, 2, 3, 4], 2) # number of partitions - 2

def f(iterator):
  yield sum(iterator)

rdd.mapPartitions(f).collect()

# Results [3, 7], partition 1 : 1+2 = 3, partition 2 : 3+4 =7

**mapPartitionsWithIndex()** - Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

In [None]:
rdd = sc.parallelize([1, 2, 3, 4], 4)
def f(splitIndex, iterator): yield splitIndex
rdd.mapPartitionsWithIndex(f).sum()

### Spark Python Actions

**Creating an RDD to explain "RDD actions with Examples"**

In [None]:
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]

inputRDD = spark.sparkContext.parallelize(data)

listRdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])

from operator import add

After creating two RDDs as given above, we use these two as and when necessary to demonstrate the RDD actions.

**first()** – Return the first element in the dataset.

In [None]:
#first
print("first :  "+str(listRdd.first()))
print("first :  "+str(inputRDD.first()))

**take()** – Return the first num elements of the dataset.

In [None]:
#take()
print("take : "+str(listRdd.take(2)))

**takeSample()** – Return the subset of the dataset in an Array.

In [None]:
print("take : "+str(listRdd.takeSample(0,3))) # ([1,2,3,4,5,3,2])

**takeOrdered()** – Return the first num (smallest) elements from the dataset and this is the opposite of the take() action.

In [None]:
print("takeOrdered : "+ str(listRdd.takeOrdered(2)))

**collect()** - Return the complete dataset as an Array.

In [None]:
#Collect
data = listRdd.collect()
print(data)

**count()** – Return the count of elements in the dataset.

In [None]:
print("Count : "+str(listRdd.count()))

**countByValue()** – Return Map[T,Long] key representing each unique value in dataset and value represents count each value present.

In [None]:
print("countByValue :  "+str(listRdd.countByValue()))

**reduce()** – Reduces the elements of the dataset using the specified binary operator.

In [None]:
redRes=listRdd.reduce(add)
print(redRes)

**top()** – Return top n elements from the dataset.

In [None]:
print("top : "+str(listRdd.top(2)))
print("top : "+str(inputRDD.top(2)))

**fold()** - Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value."

In [None]:
foldRes=listRdd.fold(0, add)
print(foldRes)

**foldByKey()** -  is quite similar to fold(), both use a zero value of the same type of the data in our RDD and combination function.

In [None]:
inputRDD.foldByKey(0, add).collect()

**reduceByKey()** - Merge the values for each key using an associative reduce function.

In [None]:
sorted(inputRDD.reduceByKey(add).collect())

**combineByKey()** - Generic function to combine the elements for each key using a custom set of aggregation functions.

In [None]:
def f(inputRDD):
  return inputRDD
def add(A, B):
  return A + str(B)
sorted(inputRDD.combineByKey(str, add, add).collect())

### PySpark User Defined Functions

* PySpark UDF is a User Defined Function that is used to create a reusable
function in Spark.

* Once UDF is created, that can be re-used on multiple DataFrames and SQL (after registering).

* The default type of the udf() is StringType.

Created dataframe with two columns "Seqno" and "Name"

In [None]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

Applying UDF

In [None]:
# creating a udf using lambda
convertUDF = udf(lambda z: z.upper())
df.select(col("Seqno"), convertUDF(col("Name")).alias("Name") ).show(truncate=False)

#### **Shuffle Operations**


Shuffling is a mechanism PySpark uses to redistribute the data across different executors and even across machines. PySpark shuffling triggers when we perform certain transformation operations like gropByKey(), reduceByKey(), join() on RDDS

Spark also supports transformations with wide dependencies, such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition can reside in many partitions of the parent dataset.

To perform these transformations, all of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy this requirement, Spark performs a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.

For example, consider the following code:

**sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()**

It runs a single action, count, which depends on a sequence of three transformations on a dataset derived from a text file. This code runs in a single stage because none of the outputs of these three transformations depend on data that comes from different partitions than their inputs.

**Below is an example implementing RDD based model to count the words given in a file**



To implement RDD based model, we have used the text file (**Spark_Text.txt**) which includes Apache Spark notes/information. This text file contains 5 paragraphs of information on Spark.

We would perform RDD Transformations and Actions on the file to count the words given in the text file.

In [None]:
rdd = sc.textFile("Spark_Text.txt")

In [None]:
# To lower the case of each word of a document, we can use the map transformation.
def Func(lines):
      lines = lines.lower()
      lines = lines.split()
      return lines
rdd1 = rdd.map(Func)

In [None]:
rdd1.take(1)

In [None]:
#To get the flat output, we need to apply a transformation which will flatten the output, The transformation “flatMap" will help here:
rdd2 = rdd.flatMap(Func)
rdd2.take(3)


In [None]:
rdd3 = rdd2.filter(lambda x:x!= '')
rdd3.take(7)  # We can check first 7 elements of “rdd3” by applying take action.

In [None]:
rdd3_mapped = rdd3.map(lambda x: (x,1))
rdd3_grouped = rdd3_mapped.groupByKey()

In [None]:
rdd3_mapped.reduceByKey(lambda x,y: x+y).map(lambda x:(x[1],x[0])).sortByKey(False).take(200)

**In the below example we can see Spark Transformations in Python using a CSV file.**

We will use this CSV file (**Google_Books.csv**) to work on Spark Transformations.

This data was acquired from the Google Books store. Google API was used to acquire the data. Nine features were gathered for each book in the data set.

In [None]:
book_names = sc.textFile("google_books.csv")
rows = book_names.map(lambda line: line.split(",")) #we are creating a new RDD called “rows” by splitting every row in the book_names RDD.

In [None]:
for row in rows.take(rows.count()):
  print(row[1])

In [None]:
for row in rows.take(10):
  print(row[1])

In [None]:
# filter() - Creating a new RDD by returning only the elements that satisfy the search filter.
rows.filter(lambda line: "Inward Journey" in line).collect()

In [None]:
# groupByKey() The following groups all titles to their publisher. Operates on value pairs
rows = book_names.map(lambda line: line.split(","))
titleToPublisher = rows.map(lambda n: (str(n[0]),str(n[6]) )).groupByKey()
titleToPublisher.map(lambda x : {x[0]: list(x[1])}).take(5)