# Advanced RDD Transformations and Actions
               - ANIMESH SINGH
> FOR PART 1 OF THIS NOTEBOOK REFER TO THIS LINK : [Part1](https://github.com/Animeshsinghiit/Spark-and-Pyspark/blob/main/pyspark%20practise%20part%201.ipynb)

In [None]:
import pyspark
from pyspark import SparkContext,SparkConf
conf=SparkConf().setAppName('RDD_practise').setMaster("local[*]")
sc=SparkContext(conf=conf)
print(sc)

In [None]:
import random
randlist=random.sample(range(0,40),10)
randlist

In [None]:
rdd1=sc.parallelize(randlist,4)
rdd2=rdd1.map(lambda x:x+3)

In [None]:
print('rdd1',rdd1.collect())
print('rdd2',rdd2.collect())

In [None]:
# union()
rdd2=rdd1.union(rdd2)
print("partitions :",rdd2.glom().collect())
print("Total number of partitions",rdd2.getNumPartitions())

In [None]:
# intersection()
rdd3=rdd1.intersection(rdd2)
print("partitions ",rdd3.glom().collect())
print("Number of partions ",rdd3.getNumPartitions())

In [None]:
# get Number of Empty Partitions
count=0
for item in rdd3.glom().collect():
    if len(item)==0:
        count+=1
count

In [None]:
# coalesce(num of partions) : used to decrease number of partitions
rdd3.coalesce(2).glom().collect()

In [None]:
# takeSample(withreplacement,num,seed) : Samples the num of data provided from rdd,
#     WARNING : Avoid usint it for sampling huge side data as it's executed over driver and there's no parallel processing
rdd3.takeSample(False,2)

In [None]:
# takeOrdered(n,[ordering]) : retrun given number of elements from the data in ascending order
#     WARNING : Avoid it for huge size data, same reason as of takeSample()
rdd3.takeOrdered(4)

In [None]:
#takeOrdered can be computationaly expensive as we can see here what it does is,
#if say our data is in GB then it will first convert then it returns,
#4 elements in order, and all this is executed over driver, so no parallel processing
rdd3.takeOrdered(4,key=lambda x:-x) # descending order

In [None]:
#reduce(aggregator)
rdd3.reduce(lambda x,y:x*y)

In [None]:
#reduceByKey(aggregator):same as reduce but works on same keys
rdd_rbk=sc.parallelize([(1,2),(1,3),(2,3),(3,4),(2,4),(5,2)],2)
rdd_rbk.glom().collect()
rbk=rdd_rbk.reduceByKey(lambda x,y:x+y)
print('rdd after RBK :',rbk.collect())
print("all keys :",rbk.keys().collect())
print("all values :",rbk.values().collect())

In [None]:
# sortByKey(True:default/False) : Sorts with with respect to key
print('Default :',rbk.sortByKey().collect())
print('when False :',rbk.sortByKey(False).collect())

In [None]:
# countByKey(): Counts the number of values associated with each key
print('Count By Key :',rdd_rbk.countByKey())
print('In better format (key,no_of_values) :',rdd_rbk.countByKey().items())

In [None]:
# groupByKey() : 
   # WARNING : It collects all the keys and values to driver, so it's not good for big size data
    # WHEN TO USE : when after many other operations on big size data we are getting to deal with small size data.
rddgroup=rdd_rbk.groupByKey()
print("output: ",rddgroup.collect())
print("---------------------------------------------------------------------------------------------------")
for item in rddgroup.collect():
    print("key :{key}".format(key=item[0]),"|","values :{values}".format(values=[item for item in item[1]]))

In [None]:
# lookup(key) : to see value of specific key
rdd_rbk.lookup(2)

### When To use Cache or persist?
> If we have RDD, which we needed to use many times, it's better to keep it inside a chache to avoid additional processing in storage, because once you collect you result in memory spark uses garbage collector to delete unused result, an if we have to read that RDD result again then it has to recalculate it, so to avoid this recalculation you can keep your RDD inside a Cache
" So if that RDD is used many times keep it in cache for faster processing "


> We can use cache() or persist() both for this task 

In [None]:
rdd_rbk.persist()

> now our RDD is in cache and it won't be deleted because of garbage collection or any storage policy running in spark

### Persistance 
> When we have large size data and not that much available memory, even if we use parallel processing then also tasks will be slow, so this concept comes into picture, we distribute the data into memory and disk.


In [None]:
from pyspark import StorageLevel
rdd1.persist(StorageLevel.MEMORY_AND_DISK)