### Importing required libraries and setting up dependencies

In [2]:
import os
import sys

In [3]:
!ls "/usr/spark2.4.3"

bin   data	jars	    LICENSE   NOTICE  R		 RELEASE  yarn
conf  examples	kubernetes  licenses  python  README.md  sbin


In [4]:
!ls "/usr/spark2.4.3/python/lib"

py4j-0.10.7-src.zip  PY4J_LICENSE.txt  pyspark.zip


In [5]:
os.environ["SPARK_HOME"] = "/usr/spark2.4.3"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

###  Single Source of Entrypoint in Spark 2.x or Higher 
- local : Run Spark locally with one worker thread (i.e. no parallelism at all).
- local[K] : Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). Eg.:
- local[1] : This allocates only one CPU for tasks and if a receiver is running on it, there is no resource left to process the received data. 
- local[*] : Run Spark locally with as many worker threads as logical cores on your machine. It uses as many threads as your spark local machine have, where you are running your application.
- local[*,F] : Run Spark locally with as many worker threads as logical cores on your machine and F maxFailures.   
- yarn : Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Aman_Spark_First_App").master("local[*]").enableHiveSupport().getOrCreate()
# master("yarn")     # If we are using yarn/mesos as our Resource Manager , then pass 'yarn' as argument to master

sc = spark.sparkContext

In [6]:
print(f'Name of our Spark Application is : {sc} \n')
print(f'Name of our Spark Application is : {sc.appName} \n')
print(f'Name of our Spark object is : {spark} \n')
print(f'Spark Version is : {sc.version} \n')
print(f'Directories available under Spark is \n : {dir(sc)} \n')
from platform import python_version
print(f'Location of our Python directory is : {sys.executable} \n')
print(f'Anaconda verison is :  {sys.version_info} \n') 
print(f'Python version is : {python_version()} \n')

Name of our Spark Application is : <SparkContext master=local[*] appName=Aman_Spark_First_App> 

Name of our Spark Application is : Aman_Spark_First_App 

Name of our Spark object is : <pyspark.sql.session.SparkSession object at 0x7fbe70189860> 

Spark Version is : 2.4.3 

Directories available under Spark is 
 : ['PACKAGE_EXTENSIONS', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_accumulatorServer', '_active_spark_context', '_batchSize', '_callsite', '_checkpointFile', '_conf', '_dictToJavaMap', '_do_init', '_encryption_enabled', '_ensure_initialized', '_gateway', '_getJavaStorageLevel', '_initialize_context', '_javaAccumulator', '_jsc', '_jvm', '_

### Comparing Python Vs Spark Datastructure Properties
- Basic commands
- Transformations modify an RDD (e.g. filter out some lines) and return an RDD
- Actions modify an RDD and returns a Python object. After an action, we can use standard Python on these objects again.

In [143]:
print('Python List Properties \n')
list1 = [1,2,3,4]
print(list1)
print(type(list1))
print(len(list1))

print('***'*15)

print('Spark RDD Properties \n')
list1_int = sc.parallelize(list1)                   # Create RDD from python List
print(list1_int.getNumPartitions())                 # Checking default partitions in RDD
list1_int = sc.parallelize(list1,15)                # Specifying partitions in RDD
print(list1_int.getNumPartitions())
print(list1_int)
print(type(list1_int))
print(list1_int.collect())                          # Printing elements of RDD
print(list1_int.count())                            # Printing COUNT of elements in RDD

Python List Properties 

[1, 2, 3, 4]
<class 'list'>
4
*********************************************
Spark RDD Properties 

12
15
ParallelCollectionRDD[40] at parallelize at PythonRDD.scala:195
<class 'pyspark.rdd.RDD'>
[1, 2, 3, 4]
4


In [142]:
print('Python List Properties \n')
list1 = ['aman','deep','samra']
print(list1)
print(type(list1))
print(len(list1))

print('***'*15)

print('Spark RDD Properties \n')
list11_str = sc.parallelize(list1)                   # Create RDD from python List
print(list11_str.getNumPartitions())                 # Checking default partitions in RDD
list11_str = sc.parallelize(list1,3)                # Specifying partitions in RDD
print(list11_str.getNumPartitions())
print(list11_str)
print(type(list11_str))
print(list11_str.collect())                          # Printing elements of RDD
print(list11_str.count())                            # Printing COUNT of elements in RDD

Python List Properties 

['aman', 'deep', 'samra']
<class 'list'>
3
*********************************************
Spark RDD Properties 

12
3
ParallelCollectionRDD[37] at parallelize at PythonRDD.scala:195
<class 'pyspark.rdd.RDD'>
['aman', 'deep', 'samra']
3


### Partitioning 
- mapPartitions - Useful when we want processing/aggregation/computation at partition level Not at global level.
- spark.default.parallelism - is the default number of partition set by spark which is by default 200
- spark.sql.shuffle.partitions - to set number of partition in the spark configuration or while running spark SQL.
- 'repartition' - shuffle the data in RDD as per requirement less or more. 
- Creates a new RDD and  does a full shuffle and creates new partitions with data that's distributed evenly. 
- New partitions are created. 
- Repartition works faster than shuffle because it creates even partition and spark is built to leverage that.

In [24]:
python_list = [i for i in range(9)]
print(f'Python list has elements : {python_list}')
rdd = sc.parallelize(python_list,3)                   
print(f'No. of partitions done on rdd is : {rdd.getNumPartitions()}')
print(f'Distibution of data on RDD via Partitioning : {rdd.glom().collect()}')
repartitoned_rdd = rdd.repartition(4)
print(f'No. of partitions AFTER Repartitioing  on rdd is : {repartitoned_rdd.getNumPartitions()}')
print(f'Distibution of data AFTER Re-Partitioning changes: {repartitoned_rdd.glom().collect()}')

Python list has elements : [0, 1, 2, 3, 4, 5, 6, 7, 8]
No. of partitions done on rdd is : 3
Distibution of data on RDD via Partitioning : [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
No. of partitions AFTER Repartitioing  on rdd is : 4
Distibution of data AFTER Re-Partitioning changes: [[6, 7, 8], [3, 4, 5], [], [0, 1, 2]]


#### COALESCE 
- Creates a new RDD and allows repartitioning but we can only decrease the size of partitions as it avoids full data shuffling which is expensive process. 
- Executors only move the data from extra nodes. 
- If data is spread across 4 nodes and we coalesced it to 2 nodes , then only 2 nodes will be touched by executors. - - But unequal sized partitions are generally slower. 
- No new partitions are created, it’s basically kind of compression 
- But even if we increase the no. of partition than acutal core, it wont do anything 

In [32]:
python_list = [i for i in range(5,11)]
print(python_list)
rdd = sc.parallelize(python_list,3)  
print(f'No. of partitions done on rdd is : {rdd.getNumPartitions()}')
print(f'Actual distribution is : {rdd.glom().collect()}')

print('*****'*15)

coalesced_rdd = rdd.coalesce(2)
print(f'No. of partitions done on rdd AFTER COALESCE is : {coalesced_rdd.getNumPartitions()}')
print(f'COALESCE distribution is : {coalesced_rdd.glom().collect()}')

print('*****'*15)

coalesced_rdd_n = rdd.coalesce(4)
print(f'No. of partitions AFTER COALESCE > THAN ACTUAL CORE  on rdd is : {coalesced_rdd_n.getNumPartitions()}')
coalesced_rdd_n.glom().collect()

[5, 6, 7, 8, 9, 10]
No. of partitions done on rdd is : 3
Actual distribution is : [[5, 6], [7, 8], [9, 10]]
***************************************************************************
No. of partitions done on rdd AFTER COALESCE is : 2
COALESCE distribution is : [[5, 6], [7, 8, 9, 10]]
***************************************************************************
No. of partitions AFTER COALESCE > THAN ACTUAL CORE  on rdd is : 3


[[5, 6], [7, 8], [9, 10]]

### Basic Functions :
- MAP : Transform your data row-wise that means it iterate over each element of RDD, returns only 1 element and new RDD . One element in -> one element out. It returns a new RDD.
- FLATMAP : Behaves similar like MAP ,also returns a new RDD with 0 or more than 1 elements at a time depends on developer Business logic, by applying function over each element and finally flatten the result i.e 1D. One element in -> 0 or more elements out (a collection). Really helpful with nested json structures. Its like unlist() in R programming.
- FILTER - returns only the elements that satisfy the search filter, similar to WHERE clause of SQL

- mapValues, flatMapValues - are More efficient than map and flatMap because Spark can maintain the partitioning.

In [21]:
print(list1_int.map(lambda x : x+5).collect())
print(list1_int.map(lambda x : x**2).collect())
print(list1_int.filter(lambda x : x > 2).collect())
print(list1_int.filter(lambda x : x % 2 == 0).collect())

[6, 7, 8, 9]
[1, 4, 9, 16]
[3, 4]
[2, 4]


In [26]:
print(list11_str.zipWithIndex().lookup('deep'))
print(list11_str.map(lambda x: x.split(',')).collect())     # Result is 2D structure
print(list11_str.map(lambda x: (x,1)).collect())
print(list11_str.map(lambda x:  x.title()).collect())
print(list11_str.map(lambda x:  x.upper()).collect())

print('***'*15)
print(list11_str.filter(lambda x: 'a' in x).collect())
print(list11_str.filter(lambda x: x.split(',')).collect())

print('***'*15)
print(list11_str.flatMap(lambda x: x.split(',')).collect())   # Result is flattened 1D structure
print(list11_str.flatMap(lambda x: (x,1)).collect())
print(list11_str.flatMap(lambda x:  x.title()).collect())

[1]
[['aman'], ['deep'], ['samra']]
[('aman', 1), ('deep', 1), ('samra', 1)]
['Aman', 'Deep', 'Samra']
['AMAN', 'DEEP', 'SAMRA']
*********************************************
['aman', 'samra']
['aman', 'deep', 'samra']
*********************************************
['aman', 'deep', 'samra']
['aman', 1, 'deep', 1, 'samra', 1]
['A', 'm', 'a', 'n', 'D', 'e', 'e', 'p', 'S', 'a', 'm', 'r', 'a']


In [141]:
# n_gram logic with n_gram = 2
list1 = 'I am applying for Jr. Research Engineer'
list1 = list1.split()

n_gram = []
for i in range(len(list1)-2+1):
    tup = ' '.join(list1[i:i+2])
    n_gram.append(tup)
print(n_gram)


# Break a list into partition and find the elements sum and len in each partition
# num_partition = 3

list1 = [1,2,3,4,5,6,7,8,9]
sublist = []
sum_sublist = []
for x in range(0, len(list1),3):
    sublist.append(list1[x:x+3])
    sum_sublist.append(sum(list1[x:x+3]))
print(sublist)
print(sum_sublist)


list1 = ['I am applying for Jr. Research Engineer']
list1 = str(list1).split()
sublist = []
len_sublist = []
for x in range(0, len(list1),3):
    sublist.append(list1[x:x+3])
    len_sublist.append(len(list1[x:x+3]))
print(sublist)
print(len_sublist)

['I am', 'am applying', 'applying for', 'for Jr.', 'Jr. Research', 'Research Engineer']
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
[6, 15, 24]
[["['I", 'am', 'applying'], ['for', 'Jr.', 'Research'], ["Engineer']"]]
[3, 3, 1]


In [83]:
def num_parititon(rdd, n_partition):
    sublist = []
    sum_sublist = []
    for x in range(0, len(rdd),n_partition):
        sublist.append(rdd[x:x+n_partition])
        sum_sublist.append(sum(rdd[x:x+n_partition]))
    return sublist, sum_sublist

list1 = [1,2,3,4,5,6,7,8,9]
list1_int = sc.parallelize(list1)                   
paritioned_rdd = list1_int.map(num_parititon(list1_int.collect(),3))
paritioned_rdd

PythonRDD[368] at RDD at PythonRDD.scala:53

### Sampling 

In [167]:
print(list1_int.sample(withReplacement=False , fraction=0.5, seed=1).collect())
print(list1_int.sample(withReplacement=True , fraction=0.5, seed=1).collect())   # Can produce dupes liek 3 is sampled twice 

[5, 6, 7, 8]
[2, 3, 3]


### SET OPERATORS

In [7]:
list1 = [1,2,3,4,5,6,7,8,9]
list2 = [10,11,12,13,14,15,16,1,2]
print(len(list1)==len(list2))

list1_int = sc.parallelize(list1)                   
list2_int = sc.parallelize(list2)   

print('Union Operator bring ALL Elements : ')
union_rdd = list1_int.union(list2_int)
print(union_rdd.collect())
print(union_rdd.count())

print('*****'*15)
print('Intersection Operator brings COMMON elements: ')
intersection_rdd = list1_int.intersection(list2_int)
print(intersection_rdd.collect())
print(intersection_rdd.count())

print('*****'*15)
print('DISTINCT brings UNIQUE elements: ')
Unique_rdd = union_rdd.distinct()
print(Unique_rdd.collect())
print(Unique_rdd.count())

True
Union Operator bring ALL Elements : 
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 1, 2]
18
***************************************************************************
Intersection Operator brings COMMON elements: 
[1, 2]
2
***************************************************************************
DISTINCT brings UNIQUE elements: 
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
16


### Group by key vs Reduce by key vs Sort by key
- All functions on K:V pair RDD's.

#### groupByKey 
- Before shuffling, Data is not combined and transferred as it is. Costlier operation if there is more data in 1 partition than other. Doesn't use Map Reduce Combiner. Performace degradation due to lots of shuffling. Takes input as K:V Pair.

In [50]:
list1 = [1,2,1,4,5,2,7,8,7]
list1_int = sc.parallelize(list1)                   
group_by_key_rdd = list1_int.map(lambda x : (x,1))  \
                            .groupByKey() \
                            .map(lambda x: (x[0], sum(x[1])))

print(group_by_key_rdd.collect())

print('*****'*15)

list1 = ['aman deep samra aman hello python hello']
list1_str = sc.parallelize(list1)  
group_by_key_rdd = list1_str.flatMap(lambda x : x.split(' '))\
                            .map(lambda x : (x,1))\
                            .groupByKey()\
                            .map(lambda x : (x[0], sum(x[1])))
         
print(group_by_key_rdd.collect())

[(1, 2), (2, 2), (4, 1), (5, 1), (7, 2), (8, 1)]
***************************************************************************
[('python', 1), ('hello', 2), ('aman', 2), ('deep', 1), ('samra', 1)]


#### reduceByKey 
- Before shuffling, operation is done on partition first and then Data is combined  and then transferred to new Executors. It uses Map Reduce Combiner. Much faster than groupbykey because less shuffling is involved. Good performance. Takes input as K:V Pair

In [51]:
list1 = [1,2,1,4,5,2,7,8,7]
list1_int = sc.parallelize(list1)                   
group_by_key_rdd = list1_int.map(lambda x : (x,1))\
                            .reduceByKey(lambda x,y : (x + y)) 
                            
print(group_by_key_rdd.collect())

print('*****'*15)

list1 = ['aman deep samra aman hello python hello']
list1_str = sc.parallelize(list1)  
group_by_key_rdd = list1_str.flatMap(lambda x : x.split(' '))\
                            .map(lambda x : (x,1))\
                            .reduceByKey(lambda x,y : (x + y)) 
         
print(group_by_key_rdd.collect())

[(1, 2), (2, 2), (4, 1), (5, 1), (7, 2), (8, 1)]
***************************************************************************
[('python', 1), ('hello', 2), ('aman', 2), ('deep', 1), ('samra', 1)]


#### sortByKey
- simply sort the data by key in desc or asc order. By default its always ascending

In [59]:
list1 = [1,2,1,4,5,2,7,8,7]
list1_int = sc.parallelize(list1)                   
group_by_key_rdd = list1_int.map(lambda x : (x,1))\
                            .sortByKey(ascending= False)

print(group_by_key_rdd.collect())

print('*****'*15)

list1 = ['aman deep samra aman hello python hello']
list1_str = sc.parallelize(list1)  
group_by_key_rdd = list1_str.flatMap(lambda x : x.split(' '))\
                            .map(lambda x : (x,1))\
                            .sortByKey(ascending= False)
         
print(group_by_key_rdd.collect())

[(8, 1), (7, 1), (7, 1), (5, 1), (4, 1), (2, 1), (2, 1), (1, 1), (1, 1)]
***************************************************************************
[('samra', 1), ('python', 1), ('hello', 1), ('hello', 1), ('deep', 1), ('aman', 1), ('aman', 1)]


In [69]:
list1 = [1,2,1,4,5,2,7,8,7]
list1_int = sc.parallelize(list1)                   
group_by_key_rdd = list1_int.map(lambda x : (x,x**2))\
                            .sortByKey(ascending= False)

print(group_by_key_rdd.collect())

distinct_rdd = group_by_key_rdd.distinct()
print(distinct_rdd.collect())

[(8, 64), (7, 49), (7, 49), (5, 25), (4, 16), (2, 4), (2, 4), (1, 1), (1, 1)]
[(5, 25), (8, 64), (7, 49), (4, 16), (1, 1), (2, 4)]


#### aggregateByKey 
- aggregateByKey - Takes 3 arguments i.e Initializer/Accumulators , Combiner/Sequence Function and Final Merge function
- Accumulators -  if sum then initialize with 0 , if multiply then initialize with 1
- Combine/Sequence Function - Combine values within single partition. Gives intermediate result to Final merge function
- Final merge Function - Combine values from all partitions, takes input from Combine Function
- Value of output can be differnet from Input
- reduceByKey is special variant of aggregateByKey
- They both helps in Parallel computation as they perform the Combine function on their respective partitions
- aggregateByKey can result in multiple datatypes but reduceByKey outputs the same datatype
- Useful when we have to perform operation on values splitted by partiton followed by any aggregation.


In [7]:
list1 = ['aman saurabh manish aman saurabh']
list1_str = sc.parallelize(list1)  
group_by_key_rdd = list1_str.flatMap(lambda x : x.split(' '))\
                            .map(lambda x : (x,len(x)*10))

print('reduceByKey :')
print(group_by_key_rdd.reduceByKey(lambda x,y : (x + y)).collect())

print('aggregateByKey exanple 1 and 2 :')
print(group_by_key_rdd.aggregateByKey(0, lambda x,y : (x + y) , lambda y,x: x+y).collect())
print(group_by_key_rdd.aggregateByKey(0, lambda k,v: int(v)+k, lambda v,k: k+v).collect())

reduceByKey :
[('aman', 80), ('saurabh', 140), ('manish', 60)]
aggregateByKey exanple 1 and 2 :
[('aman', 80), ('saurabh', 140), ('manish', 60)]
[('aman', 80), ('saurabh', 140), ('manish', 60)]


#### keyBy

In [24]:
list1 = ['aman saurabh manish deep clarke']

list_rdd = sc.parallelize(list1)
split_list_rdd = list_rdd.flatMap(lambda x : x.split(' '))   # Flattened 1D Array
#list_rdd.map(lambda x : x.split(' ')).collect() # 2D Array 
split_list_rdd.keyBy(lambda x : x[0]).collect()

[('a', 'aman'),
 ('s', 'saurabh'),
 ('m', 'manish'),
 ('d', 'deep'),
 ('c', 'clarke')]

### ZIP

In [68]:
first_name = ['aman', 'saurabh', 'manish']
sur_name = ['samra', 'mishra', 'dhull']
full_name = list(zip(first_name,sur_name))
print(f'Python ZIP function results in : \n {full_name} \n')
full_name = [' '.join(i) for i in full_name]
print(f'Python ZIP function results in : \n {full_name} \n')

print('****'*15)

firstname_rdd = sc.parallelize(first_name)
surname_rdd = sc.parallelize(sur_name)
full_name = firstname_rdd.zip(surname_rdd)
print(f'[PySpark RDD ZIP function results in : \n {full_name.collect()} \n')
full_name = [' '.join(i) for i in full_name.collect()]
print(f'PySpark RDD ZIP function results in : \n {full_name} \n')

Python ZIP function results in : 
 [('aman', 'samra'), ('saurabh', 'mishra'), ('manish', 'dhull')] 

Python ZIP function results in : 
 ['aman samra', 'saurabh mishra', 'manish dhull'] 

************************************************************
[PySpark RDD ZIP function results in : 
 [('aman', 'samra'), ('saurabh', 'mishra'), ('manish', 'dhull')] 

PySpark RDD ZIP function results in : 
 ['aman samra', 'saurabh mishra', 'manish dhull'] 



### JOINS
- Inner Join - Join the the RDD's on basis of KEYS
- Left Join - Take all values from Left RDD and matches on KEY, if it doesn't find a matching key, it returns None
- Right Join -Take all values from Right RDD and matches on KEY, if it doesn't find a matching key, it returns None
- If input is K:V , then output can be k:(v:v)

In [15]:
list1 = ['aman saurab manis']
list2 = ['deep sharma aman']
list1_str = sc.parallelize(list1)  
rdd_1 = list1_str.flatMap(lambda x : x.split(' '))\
                            .map(lambda x : (x,len(x)*10))

list2_str = sc.parallelize(list2)  
rdd_2 = list2_str.flatMap(lambda x : x.split(' '))\
                            .map(lambda x : (x,len(x)*10))

print(rdd_1.collect())
print(rdd_2.collect())

print('*****'*15)

inner_join_key = rdd_1.join(rdd_2).collect()
left_join_key = rdd_1.leftOuterJoin(rdd_2).collect()
right_join_key = rdd_1.rightOuterJoin(rdd_2).collect()

print(f'INNER join on 2 rdd by key results in : {inner_join_key} \n')
print(f'LEFT join on 2 rdd by key results in : {left_join_key} \n')
print(f'RIGHT join on 2 rdd by key results in : {right_join_key} \n')

[('aman', 40), ('saurab', 60), ('manis', 50)]
[('deep', 40), ('sharma', 60), ('aman', 40)]
***************************************************************************
INNER join on 2 rdd by key results in : [('aman', (40, 40))] 

LEFT join on 2 rdd by key results in : [('saurab', (60, None)), ('manis', (50, None)), ('aman', (40, 40))] 

RIGHT join on 2 rdd by key results in : [('sharma', (None, 60)), ('aman', (40, 40)), ('deep', (None, 40))] 



In [20]:
list1 = ['aman saurab manis aman']
list2 = ['deep sharma aman']
list1_str = sc.parallelize(list1)  
rdd_1 = list1_str.flatMap(lambda x : x.split(' '))\
                            .map(lambda x : (x,len(x)*10))

list2_str = sc.parallelize(list2)  
rdd_2 = list2_str.flatMap(lambda x : x.split(' '))\
                            .map(lambda x : (x,len(x)*10))

print(rdd_1.collect())
print(rdd_2.collect())

print('*****'*15)

inner_join_key = rdd_1.join(rdd_2)
left_join_key = rdd_1.leftOuterJoin(rdd_2)
right_join_key = rdd_1.rightOuterJoin(rdd_2)

print(f'INNER join on 2 rdd by key results in : {inner_join_key.collect()} \n')
print(f'LEFT join on 2 rdd by key results in : {left_join_key.collect()} \n')
print(f'RIGHT join on 2 rdd by key results in : {right_join_key.collect()} \n')

[('aman', 40), ('saurab', 60), ('manis', 50), ('aman', 40)]
[('deep', 40), ('sharma', 60), ('aman', 40)]
***************************************************************************
INNER join on 2 rdd by key results in : [('aman', (40, 40)), ('aman', (40, 40))] 

LEFT join on 2 rdd by key results in : [('saurab', (60, None)), ('manis', (50, None)), ('aman', (40, 40)), ('aman', (40, 40))] 

RIGHT join on 2 rdd by key results in : [('sharma', (None, 60)), ('aman', (40, 40)), ('aman', (40, 40)), ('deep', (None, 40))] 



In [38]:
print(inner_join_key.collect())
print(inner_join_key.reduceByKey(lambda x,y : (x + y)).collect())
print(inner_join_key.groupByKey().collect())


print(inner_join_key.map(lambda x : (x[0])).collect())
print(inner_join_key.map(lambda x : (x[1][1])).collect())
print(inner_join_key.map(lambda x : (x[0] , x[1][1])).collect())

print(inner_join_key.map(lambda x : (x[0] , x[1][1]))\
                    .reduceByKey(lambda x,y : (x + y)).collect())

[('aman', (40, 40)), ('aman', (40, 40))]
[('aman', (40, 40, 40, 40))]
[('aman', <pyspark.resultiterable.ResultIterable object at 0x7f68d42ed5f8>)]
['aman', 'aman']
[40, 40]
[('aman', 40), ('aman', 40)]
[('aman', 80)]


#### CoGroup
- Very similar to relation database operation FULL OUTER JOIN
- Joins in spark are actually implemented with cogroup, basically the join just breaks the iterables from cogroup into tuples.
- Instead of flattening the result per line per record, Cogroup will give you the interable interface to us.

In [36]:
tmp_list1 = [('spark', 1), ('hdfs', 2), ('flink', 3), ('kafka', 4), ('hive', 5)]
tmp_list2 = [('spark', 2), ('hdfs', 3), ('flink', 4), ('d', 5), ('hive', 6),('teradata', 7)]

tmp_list1_rdd = sc.parallelize(tmp_list1)
tmp_list2_rdd = sc.parallelize(tmp_list2)

cogroup_rdd = tmp_list1_rdd.cogroup(tmp_list2_rdd)

for k,v in cogroup_rdd.collect():
    print(k ,  tuple(map(list,v)))

print('*****'*15)

print(cogroup_rdd.map(lambda x : x[0]).collect())   # Returns all Keys
print('*****'*15)
print(cogroup_rdd.map(lambda x : x[1]).collect())   # Returns all iterables
print('*****'*15)
print(cogroup_rdd.map(lambda x : x[1][0]).collect())   # Returns all iterables
print('*****'*15)
print(cogroup_rdd.map(lambda x : sum(x[1][0])).collect())   # Returns all elements from tuple 1st part
print('*****'*15)
print(cogroup_rdd.map(lambda x : sum(x[1][1])).collect())   # Returns all elements from tuple 2nd part
print('*****'*15)
print(cogroup_rdd.map(lambda x : (sum(x[1][0])) + sum(x[1][1])).collect())   # Returns all elements and sum tuples
print('*****'*15)
print(cogroup_rdd.map(lambda x : (x[0], (sum(x[1][0])) + sum(x[1][1]))).collect())   # Aggregations : Returns all elements and sum tuples

hive ([5], [6])
d ([], [5])
hdfs ([2], [3])
spark ([1], [2])
teradata ([], [7])
flink ([3], [4])
kafka ([4], [])
***************************************************************************
['hive', 'd', 'hdfs', 'spark', 'teradata', 'flink', 'kafka']
***************************************************************************
[(<pyspark.resultiterable.ResultIterable object at 0x7fc3942c5780>, <pyspark.resultiterable.ResultIterable object at 0x7fc3942c5e48>), (<pyspark.resultiterable.ResultIterable object at 0x7fc3942c5978>, <pyspark.resultiterable.ResultIterable object at 0x7fc3942c5b38>), (<pyspark.resultiterable.ResultIterable object at 0x7fc3942c5e10>, <pyspark.resultiterable.ResultIterable object at 0x7fc3942c5470>), (<pyspark.resultiterable.ResultIterable object at 0x7fc3942c57f0>, <pyspark.resultiterable.ResultIterable object at 0x7fc3942c5048>), (<pyspark.resultiterable.ResultIterable object at 0x7fc3942c5710>, <pyspark.resultiterable.ResultIterable object at 0x7fc3942c5f28>), (<p

#### Cartesian Join
- Here each element/key will be multiplied with every element/key of other RDD

In [82]:
cartesian_rdd = tmp_list1_rdd.cartesian(tmp_list2_rdd)
print(cartesian_rdd.collect()[:6])   # Fetching records of spark as a key and its cross join with other keys 
print('*****'*15)
print(cartesian_rdd.map(lambda x : (x[0][1] , x[1][1]) ).collect()[0:6])   # Fetching tuples 
print('*****'*15)
print(cartesian_rdd.map(lambda x : (x[0][1] + x[1][1]) ).collect()[0:6])   # Summing values in tuples
print('*****'*15)
print(cartesian_rdd.map(lambda x : (x[0][0], (x[0][1] + x[1][1])) ).collect()[0:6]) # Key - Summing values 

[(('spark', 1), ('spark', 2)), (('spark', 1), ('hdfs', 3)), (('spark', 1), ('flink', 4)), (('spark', 1), ('d', 5)), (('spark', 1), ('hive', 6)), (('spark', 1), ('teradata', 7))]
***************************************************************************
[(1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7)]
***************************************************************************
[3, 4, 5, 6, 7, 8]
***************************************************************************
[('spark', 3), ('spark', 4), ('spark', 5), ('spark', 6), ('spark', 7), ('spark', 8)]


### REDUCE - Actions ( Aggregate, take, sum, reduce, collect, max, min, std, mean , countByKey)
- We dont need to use collect here, as its not a transformation 
- It triggers execution of DAG and gets execute on its own RDD
- It shuffles data from multiple partitions and reduces to a single value
- It aggregates all values for a given key value
- 
- collect: Dump all elements and converts the RDD into a Python list
- count: Returns the number of elements in an RDD
- countByValue: Outputs a dictionary of (key, n), i.e. a count, by unique value. 
- take, top: Sample a few values like Unix Head/Tail command 

In [46]:
python_list = [i for i in range(1,6)]
print(python_list)
rdd = sc.parallelize(python_list)
print(rdd.reduce(lambda x,y : x + y))
print(rdd.reduce(lambda x,y : x * y))

[1, 2, 3, 4, 5]
15
120


In [90]:
list1 = ['aman', 'saurabh', 'manish', 'deep', 'clarke']
list_rdd = sc.parallelize(list1)
cnt = list_rdd.keyBy(lambda x : (x[0]))
cnt.countByKey()    # It outputs in the form of MAP 


#list_rdd.countByKey() 

defaultdict(int, {'a': 1, 's': 1, 'm': 1, 'd': 1, 'c': 1})

## PYTHON

### list = [ ]  
- Square Brackets can occupy more size on block. Can be appended i.e Mutable 
- append, extend, insert 
- del, remove, pop to delete elements in list
### tuple = ( )  
- Simple Brackets with less size and occupies single block .
- It also elements of multiple types like list.
- Its immutable like RDD.
- Same tuple cannot be changed but we can assign same tuple with more elements to Another tuple
- We can only create, remove , update and access (slice) the elements of Tuple
- we cant delete individual element of tuple but we can delete entire tuple using del tuplename
### dict = { }   
- Curly Brackets, it can have both list and tuple inside it


In [111]:
tuple1 = (1,2,3,'aman',4,5,'samra')
print(f'Accessing tuple - Fourth element of tuple is : {tuple1[4]} \n')

tuple2 = (6,7,8,'hello',9,10,'world')

concated_tuple = tuple1+tuple2
print(f'Updating tuple  : {concated_tuple} \n')

print(f'Deleting ENTIRE tuple (Elements cant be deleted individually) : \n')
del concated_tuple

print(concated_tuple)

Accessing tuple - Fourth element of tuple is : 4 

Updating tuple  : (1, 2, 3, 'aman', 4, 5, 'samra', 6, 7, 8, 'hello', 9, 10, 'world') 

Deleting ENTIRE tuple (Elements cant be deleted individually) : 



NameError: name 'concated_tuple' is not defined

In [104]:
tuple1 = (1)  # Here interpretor will not consider this as tuple. It will only consider as a Integer
print(f'Strange behaviour of Tuple : {type(tuple1)} \n')

tuple1 = (1,)  # Now it has become a Tuple with a COMMA 
print(f'Strange behaviour of Tuple with COMMA : {type(tuple1)} \n')

tuple1 = 1,2,3  # Now it has become a Tuple with a COMMA 
print(f'Strange behaviour of Tuple WITHOUT assigning Simple Brackets : {type(tuple1)} \n')

Strange behaviour of Tuple : <class 'int'> 

Strange behaviour of Tuple with COMMA : <class 'tuple'> 

Strange behaviour of Tuple WITHOUT assigning Simple Brackets : <class 'tuple'> 

