## Spark Basics

In [1]:
import pandas as pd
import numpy as np
import os
import warnings
import datetime

pd.set_option('display.float_format', lambda x : '{:,.2f}'.format(x))
warnings.filterwarnings("ignore")
pd.set_option('display.max_columns', None)

In [None]:
if not ('sc' in locals() or 'sc' in globals()):
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    conf.setMaster('spark://spark-master:7077')
    conf.set('spark.executor.memory', '512m')
    conf.set('spark.app.name', 'basics')


    sc = SparkContext.getOrCreate(SparkContext(conf=conf))
    
    spark = SparkSession \
        .builder \
        .getOrCreate()

<img src="../img/basics.png" />

### RDD

RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. 

In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.

Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run computations in parallel on several nodes, while doing transformations on RDD we don’t have to worry about the parallelism as PySpark by default provides.

This Apache PySpark RDD tutorial describes the basic operations available on RDDs, such as map(), filter(), and persist() and many more. In addition, this tutorial also explains Pair RDD functions that operate on RDDs of key-value pairs such as groupByKey() and join() etc.

#### Benefits

__Immutability__
PySpark RDD’s are immutable in nature meaning, once RDDs are created you cannot modify. When we apply transformations on RDD, PySpark creates a new RDD and maintains the RDD Lineage.

__Fault Tolerance__
PySpark operates on fault-tolerant data stores on HDFS, S3 e.t.c hence any RDD operation fails, it automatically reloads the data from other partitions. Also, When PySpark applications running on a cluster, PySpark task failures are automatically recovered for a certain number of times (as per the configuration) and finish the application seamlessly.

__Lazy Evolution__
PySpark does not evaluate the RDD transformations as they appear/encountered by Driver instead it keeps the all transformations as it encounters(DAG) and evaluates the all transformation when it sees the first RDD action.

__Partitioning__
When you create RDD from a data, It by default partitions the elements in a RDD. By default it partitions to the number of cores available.

#### PySpark RDD Limitations

PySpark RDDs are not much suitable for applications that make updates to the state store such as storage systems for a web application. For these applications, it is more efficient to use systems that perform traditional update logging and data checkpointing, such as databases. The goal of RDD is to provide an efficient programming model for batch analytics and leave these asynchronous applications.

### Create an RDD

<img src="../img/spark-partitions-2.jpg" />

In [3]:
# Create RDD from parallelize    
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd = spark.sparkContext.parallelize(dataList)


# Create RDD from external Data source
rdd2 = spark.sparkContext.textFile("../data/hello_world.txt")

In [None]:
rdd.take(1)

In [None]:
rdd.take(1)

In [None]:
rdd2.take(1)

### Transformations

Transformations on Spark RDD returns another RDD and transformations are __lazy meaning they don’t execute until you call an action on RDD__. Some transformations on RDD’s are flatMap(), map(), reduceByKey(), filter(), sortByKey() and return new RDD instead of updating the current.

Also, RDD Transformations are Spark operations when executed on RDD, it results in a single or multiple new RDD’s. Since RDD are __immutable in nature__, transformations always create __new RDD__ without updating an existing one hence, this creates an RDD lineage.

<img src="../img/linage.png" width="500"/>

### RDD Transformations types

#### Narrow Transformation

Narrow transformations are the result of map() and filter() functions and these compute data that live on a single partition meaning there will not be any data movement between partitions to execute narrow transformations.

<img src="../img/narrow-transformation.png" width="500"/>

#### Map

Applies a given function to an RDD.

In [None]:
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data,4)
squared_rdd = rdd.map(lambda x:x**2)
squared_rdd.collect()

In [None]:
squared_rdd

#### Filter

Takes as input a condition and keeps only those elements that fulfill that condition.

In [None]:
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data,4)
filtered_rdd = rdd.filter(lambda x:x%2==0)
filtered_rdd.collect()

#### FlatMap

Similar to map, but each input item can be mapped to 0 or more output items.

In [None]:
data = [1,2,3,4]
rdd = sc.parallelize(data,4)
flat_rdd = rdd.flatMap(lambda x:[x,x**3])
flat_rdd.collect()

#### Wider Transformation
Wider transformations are the result of groupByKey() and reduceByKey() functions and these compute data that live on many partitions meaning there will be data movements between partitions to execute wider transformations. Since these shuffles the data, they also called shuffle transformations.

<img src="../img/wide-transformation.png" width="500"/>

#### Distinct

Returns only distinct elements in an RDD.

In [None]:
data = [1,2,2,2,2,3,3,3,3,4,5,6,7,7,7,8,8,8,9,10]
rdd = sc.parallelize(data,4)
distinct_rdd = rdd.distinct()
distinct_rdd.collect()

#### Reduce By Key

In [None]:
data = [('Apple','Fruit',200),('Banana','Fruit',24),('Tomato','Fruit',56),('Potato','Vegetable',103),('Carrot','Vegetable',34)]
rdd = sc.parallelize(data,4)

category_price_rdd = rdd.map(lambda x: (x[1],x[2]))
category_price_rdd.collect()

In [None]:
category_total_price_rdd = category_price_rdd.reduceByKey(lambda x,y:x+y)
category_total_price_rdd.collect()

#### Group By Key

In [None]:
data = [('Apple','Fruit',200),('Banana','Fruit',24),('Tomato','Fruit',56),('Potato','Vegetable',103),('Carrot','Vegetable',34)]
rdd = sc.parallelize(data,4)
category_product_rdd = rdd.map(lambda x: (x[1],x[0]))
category_product_rdd.collect()

In [None]:
grouped_products_by_category_rdd = category_product_rdd.groupByKey()
findata = grouped_products_by_category_rdd.collect()
for data in findata:
    print(data[0],list(data[1]))

### Actions

RDD actions are operations that return the raw values, In other words, any RDD function that returns other than RDD[T] is considered as an action in spark programming.

As mentioned in RDD Transformations, all transformations are lazy meaning they do not get executed right away and action functions trigger to execute the transformations.

### RDD Actions

Narrow transformations are the result of map() and filter() functions and these compute data that live on a single partition meaning there will not be any data movement between partitions to execute narrow transformations.

#### collect()

 It takes the whole RDD and brings it back to the driver program.

#### take()
Sometimes you will need to see what your RDD contains without getting all the elements in memory itself. take returns a list with the first n elements of the RDD.

In [None]:
rdd = sc.parallelize([1,2,3,4,5])
rdd.take(3)

#### takeOrdered()

In [36]:
rdd = sc.parallelize([5,3,12,23])

In [None]:
# descending order
rdd.takeOrdered(3,lambda s:-1*s)

In [None]:
# ascending order
rdd.takeOrdered(3,lambda s:1*s)