### Application of Spark
- Streaming Data
- Machine Learning
- Batch Data
- ETL Pipelines
- Full load and Replication on going

### Why Spark?
- Speed
- Distributed
- Advance Analytics
- Real Time
- Powerful Caching
- Fault Tolerant
- Deployment

### Spark Architecture


### Spark Ecosystem
- Spark SQL
- Spark Streaming
- Spark MLlib
- Spark GRAPHX

### Spark RDDs
- RDD is the spark's core abstraction which stand for Resilient Distributed Dataset.
- RDD is the immutable distributed collection of objects.
- internally spark distributes the data in RDD, to different nodes across the cluster to achieve parallelization.

### Transformations and Actions

Two types of Apache Spark RDD operations are- Transformations and Actions. A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed. When the action is triggered after the result, new RDD is not formed like transformation. 

- Transformations create a new RDD from an existing one.
- Actions return a value to the driver program after running a computation on the RDD.
- All transformations in spark are lazy.
- Spark only triggers the data flow when there's a action.

#### Creating Spark RDD

In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setAppName("Read File")

In [4]:
sc = SparkContext.getOrCreate(conf=conf)

In [5]:
sc

In [6]:
rdd = sc.textFile("sample.txt")

In [7]:
rdd.collect()

                                                                                

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

### RDDs Functions

#### map()
- Map is used as a maper of data from one state to other.
- It will create a new RDD.
- rdd.map(lambda x: x.split())

In [8]:
rdd2 = rdd.map(lambda x: x.split(' '))

In [9]:
rdd2

PythonRDD[2] at RDD at PythonRDD.scala:53

In [10]:
rdd2.collect()

[['1', '2', '3', '4', '5'],
 ['3', '4', '5', '66', '77'],
 ['12', '43', '6', '7', '8'],
 ['12', '12', '33']]

In [11]:
# simple function
def foo(x):
    return x.split(' ')

rdd3 = rdd.map(foo)
rdd3.collect()

[['1', '2', '3', '4', '5'],
 ['3', '4', '5', '66', '77'],
 ['12', '43', '6', '7', '8'],
 ['12', '12', '33']]

In [12]:
# simple function
def foo(x):
    l = x.split(' ')
    l2 = []
    for s in l:
        l2.append(int(s)+2)
    return l2

In [13]:
rdd4 = rdd.map(foo)
rdd4.collect()

[[3, 4, 5, 6, 7], [5, 6, 7, 68, 79], [14, 45, 8, 9, 10], [14, 14, 35]]

#### flatMap()
- Flat Map is used as a maper of data and explodes data before final output.
- It will create a new RDD.
- rdd.flatMap(lambda x:x.split())

###### RDD Data
['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

###### Mapped Data
[['1', '2', '3', '4', '5'],
 ['3', '4', '5', '66', '77'],
 ['12', '43', '6', '7', '8'],
 ['12', '12', '33']]

 ###### Flatmapped Data
 ['1', '2', '3', '4', '5', '3', '4', '5', '66', '77', '12', '43', '6', '7', '8', '12', '12', '33']

In [14]:
rdd.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

In [15]:
mappedRdd = rdd.map(lambda x: x.split(" "))
mappedRdd.collect()

[['1', '2', '3', '4', '5'],
 ['3', '4', '5', '66', '77'],
 ['12', '43', '6', '7', '8'],
 ['12', '12', '33']]

In [17]:
flatmappedRdd = rdd.flatMap(lambda x: x.split(" "))
flatmappedRdd.collect()

['1',
 '2',
 '3',
 '4',
 '5',
 '3',
 '4',
 '5',
 '66',
 '77',
 '12',
 '43',
 '6',
 '7',
 '8',
 '12',
 '12',
 '33']

#### filter()
- Filter is used to remove the elements from the RDD
- It will create a new RDD
- rdd.filter(lambda x:x!= 123)

In [20]:
rdd2 = rdd.filter(lambda x: x != '12 12 33')
rdd2.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8']

In [21]:
def foo(x):
    return 1==1  # True

rdd2 = rdd.filter(foo)
rdd2.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

In [22]:
def noo(x):
    return 1==2  # False

rdd2 = rdd.filter(noo)
rdd2.collect()

[]

In [24]:
def foo(x):
    if x == '12 12 33':
        return False
    else:
        return True

rdd2 = rdd.filter(foo)
rdd2.collect()


['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8']

#### distinct()
- Distinct is used to get the distinct elements in RDD
- It will create a new RDD
- rdd.distinct()

In [25]:
rdd.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

In [26]:
rdd2 = rdd.distinct()
rdd2.collect()

['3 4 5 66 77', '12 43 6 7 8', '12 12 33', '1 2 3 4 5']

In [27]:
rdd2 = rdd.flatMap(lambda x: x.split(" "))
rdd2.collect()

['1',
 '2',
 '3',
 '4',
 '5',
 '3',
 '4',
 '5',
 '66',
 '77',
 '12',
 '43',
 '6',
 '7',
 '8',
 '12',
 '12',
 '33']

In [28]:
rdd3 = rdd2.distinct()
rdd3.collect()

['1', '4', '66', '77', '12', '8', '33', '2', '3', '5', '43', '6', '7']

In [30]:
rdd.flatMap(lambda x: x.split(" ")).distinct().collect()

['1', '4', '66', '77', '12', '8', '33', '2', '3', '5', '43', '6', '7']

#### groupByKey()
- GroupByKey is used to create groups based on keys in RDD
- For groupByKey to work properly the data must be in the format of (k,v), (k,v), (k2,v), (k2,v2)
   - Example: ("Apple",1), ("Ball",1), ("Apple",1)
- It will create a new RDD
- rdd.groupByKey()
- mapValues(list) are usually used to get the group data

In [31]:
rdd = sc.textFile("sample_words.txt")
rdd.collect()

['this mango company animal',
 'cat dog ant mic laptop',
 'chair switch mobile am charger cover',
 'amanda any alarm ant']

In [35]:
rdd2 = rdd.flatMap(lambda x: x.split(' '))
rdd2.collect()

['this',
 'mango',
 'company',
 'animal',
 'cat',
 'dog',
 'ant',
 'mic',
 'laptop',
 'chair',
 'switch',
 'mobile',
 'am',
 'charger',
 'cover',
 'amanda',
 'any',
 'alarm',
 'ant']

In [39]:
rdd3 = rdd2.map(lambda x: (x,len(x)))
rdd3.collect()

[('this', 4),
 ('mango', 5),
 ('company', 7),
 ('animal', 6),
 ('cat', 3),
 ('dog', 3),
 ('ant', 3),
 ('mic', 3),
 ('laptop', 6),
 ('chair', 5),
 ('switch', 6),
 ('mobile', 6),
 ('am', 2),
 ('charger', 7),
 ('cover', 5),
 ('amanda', 6),
 ('any', 3),
 ('alarm', 5),
 ('ant', 3)]

In [40]:
rdd3.groupByKey().collect()

[('this', <pyspark.resultiterable.ResultIterable at 0x7f1f91ce00a0>),
 ('mango', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89040>),
 ('cat', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89220>),
 ('ant', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89a60>),
 ('laptop', <pyspark.resultiterable.ResultIterable at 0x7f1f91c895b0>),
 ('chair', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89820>),
 ('switch', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89130>),
 ('mobile', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89850>),
 ('am', <pyspark.resultiterable.ResultIterable at 0x7f1f91c892b0>),
 ('company', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89e80>),
 ('animal', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89ee0>),
 ('dog', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89e50>),
 ('mic', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89fd0>),
 ('charger', <pyspark.resultiterable.ResultIterable at 0x7f1f91c89f70>),
 ('cover',

In [43]:
rdd3.groupByKey().mapValues(list).collect()

[('this', [4]),
 ('mango', [5]),
 ('cat', [3]),
 ('ant', [3, 3]),
 ('laptop', [6]),
 ('chair', [5]),
 ('switch', [6]),
 ('mobile', [6]),
 ('am', [2]),
 ('company', [7]),
 ('animal', [6]),
 ('dog', [3]),
 ('mic', [3]),
 ('charger', [7]),
 ('cover', [5]),
 ('amanda', [6]),
 ('any', [3]),
 ('alarm', [5])]

#### reduceByKey()
- ReduceByKey is used to combined data based on keys in RDD
- For reduceByKey to work properly the data must be in the format of (k,v), (k,v), (k2,v), (k2,v2)
     - Example: ("Apple",1), ("Ball",1), ("Apple",1)
- It will create a new RDD
- rdd.reduceByKey(lambdax, y:x+y)