# Pyspark With RDD

* RDD (Resilient Distributed Dataset) is the fundamental data structure of Apache Spark
* which are an immutable collection of objects which computes on the different node of the cluster. 
* Each and every dataset in Spark RDD is logically partitioned across many servers so that they can be computed on different nodes of the cluster.

* Resilient :-- fault-tolerant with the help of RDD lineage graph(DAG) and so able to recompute missing or damaged partitions due to node failures.
* Distributed :-- since Data resides on multiple nodes.
* Dataset :-- represents records of the data you work with. The user can load the data set externally which can be either JSON file, CSV file, text file or database via JDBC with no specific data structure.

# Features of Spark RDD

#### 1.In-memory Computation

 Spark RDDs have a provision of in-memory computation. 
 It stores intermediate results in distributed memory(RAM) instead of stable storage(disk).

#### 2.Lazy Evaluations  

All transformations in Apache Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base data set.

Spark computes transformations when an action requires a result for the driver program. Follow this guide for the deep study of Spark Lazy Evaluation.

#### 3.Fault Tolerance

Spark RDDs are fault tolerant as they track data lineage information to rebuild lost data automatically on failure. They rebuild lost data on failure using lineage, each RDD remembers how it was created from other datasets (by transformations like a map, join or groupBy) to recreate itself. Follow this guide for the deep study of RDD Fault Tolerance.

#### 4.Immutability

Data is safe to share across processes. It can also be created or retrieved anytime which makes caching, sharing & replication easy. Thus, it is a way to reach consistency in computations.

#### 5.Partitioning

Partitioning is the fundamental unit of parallelism in Spark RDD. Each partition is one logical division of data which is mutable. One can create a partition through some transformations on existing partitions.

#### 6.Persistence

Users can state which RDDs they will reuse and choose a storage strategy for them (e.g., in-memory storage or on Disk).

#### 7.Coarse-grained Operations

It applies to all elements in datasets through maps or filter or group by operation.


#### 8.Location-Stickiness

RDDs are capable of defining placement preference to compute partitions. Placement preference refers to information about the location of RDD. The DAGScheduler places the partitions in such a way that task is close to data as much as possible. Thus, speed up computation. Follow this guide to learn What is DAG?

## RDD in Apache Spark supports two types of operations:

#### 1.Transformation
Spark RDD Transformations are functions that take an RDD as the input and produce one or many RDDs as the output. They do not change the input RDD (since RDDs are immutable and hence one cannot change it), but always produce one or more new RDDs by applying the computations they represent e.g. Map(), filter(), reduceByKey() etc.
* a. Narrow Transformations 
     * It is the result of map, filter and such that the data is from a single partition only, i.e. it is self-sufficient. An output RDD has partitions with records that originate from a single partition in the parent RDD. Only a limited subset of partitions used to calculate the result.Spark groups narrow transformations as a stage known as pipelining.
* b. Wide Transformations
    * It is the result of groupByKey() and reduceByKey() like functions. The data required to compute the records in a single partition may live in many partitions of the parent RDD. Wide transformations are also known as shuffle transformations because they may or may not depend on a shuffle.

#### 2.Actions
* An Action in Spark returns final result of RDD computations. It triggers execution using lineage graph to load the data into original RDD, carry out all intermediate transformations and return final results to Driver program or write it out to file system. Lineage graph is dependency graph of all parallel RDDs of RDD.

* Actions are RDD operations that produce non-RDD values. They materialize a value in a Spark program. An Action is one of the ways to send result from executors to the driver. First(), take(), reduce(), collect(), the count() is some of the Actions in spark.

# Creating RDD one way

In [4]:
from pyspark.sql import SparkSession,S

In [5]:
# or with 
spark = SparkSession.builder.getOrCreate()

In [6]:
rdd = spark.sparkContext.parallelize([(1,'vineeth',500,'A'),
                                        (2,'sudhanshu',450,'B'),
                                         (3,'krish Naik',500,'A'),
                                         (4,'sunny',350,'C'),
                                         (5,'hitesh',600,'A+'),
                                         (6,'himesh',500,'A'),
                                         (7,'naveen',550,'A+'),])

In [7]:
rdd.collect()

[(1, 'vineeth', 500, 'A'),
 (2, 'sudhanshu', 450, 'B'),
 (3, 'krish Naik', 500, 'A'),
 (4, 'sunny', 350, 'C'),
 (5, 'hitesh', 600, 'A+'),
 (6, 'himesh', 500, 'A'),
 (7, 'naveen', 550, 'A+')]

In [8]:
## type of data
type(rdd)

pyspark.rdd.RDD

In [9]:
## we have to create a dataframe with schaema in rdd but i won't rdd have a schema
df_type_rdd = spark.createDataFrame(rdd,schema=["Id","name","marks","grade"])

In [10]:
df_type_rdd.collect()

[Row(Id=1, name='vineeth', marks=500, grade='A'),
 Row(Id=2, name='sudhanshu', marks=450, grade='B'),
 Row(Id=3, name='krish Naik', marks=500, grade='A'),
 Row(Id=4, name='sunny', marks=350, grade='C'),
 Row(Id=5, name='hitesh', marks=600, grade='A+'),
 Row(Id=6, name='himesh', marks=500, grade='A'),
 Row(Id=7, name='naveen', marks=550, grade='A+')]

In [11]:
type(df_type_rdd)

pyspark.sql.dataframe.DataFrame

In [12]:
# show ths dataset
df_type_rdd.show(truncate=False)

+---+----------+-----+-----+
|Id |name      |marks|grade|
+---+----------+-----+-----+
|1  |vineeth   |500  |A    |
|2  |sudhanshu |450  |B    |
|3  |krish Naik|500  |A    |
|4  |sunny     |350  |C    |
|5  |hitesh    |600  |A+   |
|6  |himesh    |500  |A    |
|7  |naveen    |550  |A+   |
+---+----------+-----+-----+



## RDD Narrow Tramsformation Examples

* Map
* flatmap
* Mappartion
* filter
* sample
* union


In [29]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [33]:
rdd = sc.textFile('data.txt')

In [34]:
rdd.collect()

['When "data" is used more generally as a synonym for "information", it is treated as a mass noun in singular form. This usage is common in everyday language and in technical and scientific fields such as software development and computer science. One example of this usage is the term "big data". When used more specifically to refer to the processing and analysis of sets of data, the term retains its plural form. This usage is common in natural sciences, life sciences, social sciences, software development and computer science, and grew in popularity in the 20th and 21st centuries. Some style guides do not recognize the different meanings of the term, and simply recommend the form that best suits the target audience of the guide. For example, APA style as of the 7th edition requires "data" to be treated as a plural form.[7].']

In [52]:
type(rdd)

pyspark.rdd.RDD

In [38]:
dff = rdd.first()

In [51]:
type(dff)

str

In [48]:
columns = dff.split(',')

In [49]:
columns

['When "data" is used more generally as a synonym for "information"',
 ' it is treated as a mass noun in singular form. This usage is common in everyday language and in technical and scientific fields such as software development and computer science. One example of this usage is the term "big data". When used more specifically to refer to the processing and analysis of sets of data',
 ' the term retains its plural form. This usage is common in natural sciences',
 ' life sciences',
 ' social sciences',
 ' software development and computer science',
 ' and grew in popularity in the 20th and 21st centuries. Some style guides do not recognize the different meanings of the term',
 ' and simply recommend the form that best suits the target audience of the guide. For example',
 ' APA style as of the 7th edition requires "data" to be treated as a plural form.[7].']

In [50]:
type(columns)

list

In [None]:
some examples in narrow transformation

In [None]:
## Map function
x_map = sc.parallelize([1,2,3,4,5,6,7,8,9])
y_map = x_map.map(lambda x: (x,x**2))
print(x_map.collect())
print(y_map.collect())

In [None]:
## flatMap function
x_map = sc.parallelize([1,2,3,4,5,6,7,8,9])
y_map = x_map.flatMap(lambda x: (x,x**2,100*x))
print(x_map.collect())
print(y_map.collect())

In [None]:
## filter elements with comparision operation
x_filter = sc.parallelize([1,2,3,4,5,6,7,8,9])
y_filter_rdd= x_filter.filter(lambda x: x%2 ==0)
print(x_filter.collect())
print(y_filter_rdd.collect())

In [None]:
### Mappartions is divide with 3 nodes and sum of each node is mapPartitions
x_partion = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
def f(iterator):yield sum(iterator)
y = x_partion.mapPartitions(f)
print(x_partion.glom().collect())
print(y.glom().collect())

In [None]:
## mapPartitionWithIndex
x_mpwi = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
def f(partitionIndex,iterator):yield (partitionIndex,sum(iterator))
y_mpwi= x_mpwi.mapPartitionsWithIndex(f)
print(x_mpwi.glom().collect())
print(y_mpwi.glom().collect())

In [None]:
# sample function
x_samp = sc.parallelize([1,2,3,4,5,6,7,8,9])
y_samp = x_samp.sample(False,0.4)  # withreplacement is false at that time it wotn repeat same numbers 
print(x_samp.collect())
print(y_samp.collect())

In [None]:
## union is the mainly used to join the two rdd data elements
rdd1 = sc.parallelize([1,2,3,4,5,6,7])
rdd2 = sc.parallelize([1,2,3,3,4,4,5])
rdd1.union(rdd2).collect()