In [1]:
spark

We can call Spark Session Object SparkContext as follows.
* spark.sparkContext
* sc

In [2]:
spark.sparkContext

In [3]:
sc
# For backwards compatibility reasons, it’s also still possible to call the SparkContext with sc.

## Creating RDD from Collections Using Parallelize Method

In [4]:
dataList =list(range(0, 10))
dataList

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [5]:
dataRdd = sc.parallelize(dataList)
dataRdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489

In [6]:
dataRdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [7]:
type(dataRdd)

pyspark.rdd.RDD

### Parallelize Method has two param 
* Collection
* Number of slides

In [8]:
sc.parallelize(dataList, 6).glom().collect()

[[0], [1, 2], [3, 4], [5], [6, 7], [8, 9]]

In [9]:
dataRdd2 = sc.parallelize([2, 5, 3, 1, 7, 9], 2).glom()
sorted(dataRdd2.collect())

[[1, 7, 9], [2, 5, 3]]

In [10]:
keyVal = [('a', 1), ('b', 5), ('c', 7), ('a', 2), ('a', 3), ('b', 4) , ('c', 8)]

In [11]:
keyValRdd = sc.parallelize(keyVal)

## ReduceByKey
* This method is both associative and commutative
* Reduction operator can help break down a task into various partial tasks by calculating partial results which can be used to obtain a final result. 
* It allows certain serial operations to be performed in **parallel**, thereby reducing the number of steps required for certain operations. 
* A reduction operator breaks a serial task into various partial tasks and stores the result into a private copy of the variable. 
* These private copies are then merged into a shared copy at the end.

In [12]:
newKeyVal = keyValRdd.reduceByKey(lambda a, b: a + b).collect()

In [13]:
newKeyVal

[('b', 9), ('c', 15), ('a', 6)]

#### Alternative way 
Both way return the list as w.k.t 
* collect method -> return list
* collectAsMap method -> return map.

In [14]:
from operator import add
newKeyVal = keyValRdd.reduceByKey(add).collect()
newKeyVal

[('b', 9), ('c', 15), ('a', 6)]

## CountByKey

In [15]:
# Instead of using reduceByKey for finding the total occurence of a variable 
# We can use countByKey method
# Refer the reduceByKey example where we 
countByKey = keyValRdd.countByKey()
countByKey

defaultdict(int, {'a': 3, 'b': 2, 'c': 2})

In [16]:
# countByKey method returns a hashmap (like dictionary) structure.
type(countByKey)

collections.defaultdict

In [17]:
countByKey['a']

3

countByKey method ONLY works on RDDs of type (K, V), returning a hashmap of (K, int) pairs with the COUNT of each key [1].

## CountByValue

In [18]:
keyVal = [('a', 1), ('b', 5), ('c', 7), ('a', 2), ('a', 3), ('b', 4) , ('c', 8)]
keyVal.append(('a', 1))
keyVal.append(('k', 1))
sc.parallelize(keyVal).countByValue()

defaultdict(int,
            {('a', 1): 2,
             ('a', 2): 1,
             ('a', 3): 1,
             ('b', 4): 1,
             ('b', 5): 1,
             ('c', 7): 1,
             ('c', 8): 1,
             ('k', 1): 1})

## GroupByKey

In [19]:
tupKeyVal = keyValRdd.groupByKey()
tupKeyVal.collect()

[('b', <pyspark.resultiterable.ResultIterable at 0x7fd26014a208>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x7fd26014a550>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x7fd26014a940>)]

In [20]:
list(tupKeyVal.collect()[0][1])

[5, 4]

In [21]:
tupKeyVal.map(lambda para: (para[0], list(para[1]))).collect()

[('b', [5, 4]), ('c', [7, 8]), ('a', [1, 2, 3])]

In [22]:
tupKeyVal.map(lambda para: (para[0], list(para[1]))).collectAsMap()

{'a': [1, 2, 3], 'b': [5, 4], 'c': [7, 8]}

## Fun Part

In [23]:
from PyFiles import pipeDemo
from PyFiles import pipeDemoFunc

In [24]:
dataStr = ['james', 'john', 'vin']
dataRddStr = sc.parallelize(dataStr)

In [25]:
pipeRdd = dataRddStr.pipe(pipeDemo)
pipeFuncRdd = dataRddStr.map(lambda x : pipeDemoFunc.fun(x)).collect()

In [26]:
dataRddStr.collect()

['james', 'john', 'vin']

In [27]:
pipeFuncRdd

['Hello...james', 'Hello...john', 'Hello...vin']

In [28]:
# its not working
#pipeRdd.collect()

### FlatMapValues
* for changing from k-v pair which has values as Collections to k-v pair for in

In [29]:
mapVal = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
mapVal.flatMapValues(lambda x: x).collect()

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

### Map vs FlatMap
* its same as map but the difference is that
    * map will return a sequence of the same length as the original data. In this sequence each element is a sub-sequence corresponding to one element in original data. flatMap will return a sequence whose length equals to the sum of the lengths of all sub-sequance returned by map.

In [30]:
dataFm = ['abc', 'def', 'gh']

In [38]:
dataFmRdd = sc.parallelize(dataFm)
dataFmRdd.map(lambda x: tuple(x)).collect()

[('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'h')]

In [32]:
dataFmRdd.flatMap(lambda x: list(x)).collect()

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']

In [62]:
kl = dataFmRdd.map(lambda x: tuple(x)).collect()
kl[2] = kl[2] + ('k', )
k = spark.createDataFrame(kl, ['a', 'b', 'c'])
k.collect()

[Row(a='a', b='b', c='c'), Row(a='d', b='e', c='f'), Row(a='g', b='h', c='k')]

In [66]:
k.

AttributeError: 'DataFrame' object has no attribute 'map'