In [1]:
from pyspark.sql import SparkSession

import findspark
findspark.init()

In [2]:
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

## RDD actions

In [3]:
# To call RDD we use parallelize() method

data = [1,2,3,4,5]
rDD = sc.parallelize(data, 4)
rDD.collect()   # Running a action

[1, 2, 3, 4, 5]

In [4]:
A = sc.parallelize([2,4,7])
L = A.collect()
print(type(L))
print(L)

<class 'list'>
[2, 4, 7]


In [5]:
A.reduce(lambda x,y: x*y) # RDD action

56

In [6]:
rdd = sc.parallelize([5,3,1,2])
rdd.takeOrdered(3) # order's the data and returns 3 values. It is a RDD action

[1, 2, 3]

In [7]:
rdd.count() # RDD action

4

In [8]:
rdd.take(2) # RDD action

[5, 3]

In [9]:
# rdd.saveAsTextFile("text1/")

In [10]:
words = ['this', 'is', 'the', 'best', 'mac', 'ever']
wordRDD = sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w) < len(v) else v)

'is'

In [11]:
B = sc.parallelize([1,3,5,2])
B.reduce(lambda x,y: x - y)

-9

In [12]:
words1 = ['this', 'that', 'rectangle', 'round', 'sharp']
wordRDD1 = sc.parallelize(words1)

def lesserThan(x, y):
    if len(x) < len(y): return x
    elif len(y) < len(x): return y
    else:   # lengths are equal,  compare lexicographically
        if x<y:
            return x
        else:
            return y
wordRDD1.reduce(lesserThan)

'that'

## Spark Paritions, Coalesce and Repartition

In [13]:
A = sc.parallelize(range(100000))
print(A.getNumPartitions())

8


In [14]:
D = A.repartition(6)
print(D.getNumPartitions())

6


In [15]:
D = A.coalesce(4)
print(D.getNumPartitions())

4


In [16]:
A = sc.parallelize(range(1000000), numSlices=12)
print(A.getNumPartitions())

12


In [17]:
sc.defaultParallelism

8

## RDD Transformation 

In [18]:
# Multipicaltion of number with scalar

rdd = sc.parallelize([1,2,3,4])
rdd.map(lambda x:x*2).collect()

[2, 4, 6, 8]

In [19]:
# Filter number according to condition

rdd.filter(lambda x:x%2 == 0).collect()

[2, 4]

In [20]:
# To find distinct numbers

rdd2 = sc.parallelize([1,4,2,2,3])
rdd2.distinct().collect()

[1, 2, 3, 4]

In [21]:
n = 1000000
B = sc.parallelize([1,2,3,4]* int(n/4))

In [22]:
def greaterthan(x):
    return x > 3
print('the number of elements in B that are > 3 =',B.filter(greaterthan).count())

print('the number of elements in B that are > 3 =',B.filter(lambda n:n > 3).count())

the number of elements in B that are > 3 = 250000
the number of elements in B that are > 3 = 250000


In [23]:
# Remove duplicate element in DupliateRDD, we get distinct RDD

DuplicateRDD = sc.parallelize([1,1,2,2,3,3])
print('DuplicateRDD=',DuplicateRDD.collect())
print('DistinctRDD=',DuplicateRDD.distinct().collect())

DuplicateRDD= [1, 1, 2, 2, 3, 3]
DistinctRDD= [1, 2, 3]


In [24]:
text=["you are my sunshine", "my only sunshine"]
text_file = sc.parallelize(text)

# map each line in text to a list of words

print('map:', text_file.map(lambda line: line.split(" ")).collect())

# create a single list of words by combining the words from all of the lines

print('flatmap:', text_file.flatMap(lambda line: line.split(" ")).collect())

map: [['you', 'are', 'my', 'sunshine'], ['my', 'only', 'sunshine']]
flatmap: ['you', 'are', 'my', 'sunshine', 'my', 'only', 'sunshine']


In [25]:
rdd1 = sc.parallelize([1,1,2,3])
rdd2 = sc.parallelize(['a', 'b', 1])
print('rdd1=', rdd1.collect())
print('rdd2=', rdd2.collect())
print('union as bags =', rdd1.union(rdd2).collect())
print('union as sets =', rdd1.union(rdd2).distinct().collect())

rdd1= [1, 1, 2, 3]
rdd2= ['a', 'b', 1]
union as bags = [1, 1, 2, 3, 'a', 'b', 1]
union as sets = [1, 2, 3, 'b', 'a']


## Spark pair RDD

In [26]:
regular_rdd = sc.parallelize([1,2,3,4,2,5,6])
pair_rdd = regular_rdd.map(lambda x: (x, x*x))
print(pair_rdd.collect())

[(1, 1), (2, 4), (3, 9), (4, 16), (2, 4), (5, 25), (6, 36)]


In [27]:
rdd = sc.parallelize([(1,2), (2, 4), (2, 6)])
print("Original RDD :", rdd.collect())
print("After transformation :", rdd.reduceByKey(lambda a,b: a+b).collect())

Original RDD : [(1, 2), (2, 4), (2, 6)]
After transformation : [(1, 2), (2, 10)]


In [28]:
rdd = sc.parallelize([(1,2), (1,4), (3,6)])
print("original RDD: ", rdd.collect())
print("After transformation :", rdd.sortByKey().collect())

original RDD:  [(1, 2), (1, 4), (3, 6)]
After transformation : [(1, 2), (1, 4), (3, 6)]


In [29]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("original RDD", rdd.collect())
print("After transformation : ", rdd.groupByKey().collect())
print("After transformation : ", rdd.groupByKey().mapValues(lambda x:[a for a in x]).collect())
print("After transformation : ", rdd.groupByKey().mapValues(lambda x:[a+a for a in x]).collect())

original RDD [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, <pyspark.resultiterable.ResultIterable object at 0x000002417ED97B20>), (2, <pyspark.resultiterable.ResultIterable object at 0x000002417EE74820>)]
After transformation :  [(1, [2]), (2, [4, 6])]
After transformation :  [(1, [4]), (2, [8, 12])]


In [30]:
rdd = sc.parallelize([(1,2), (2,1), (2,2)])
print("Original RDD", rdd.collect())

# the lambda function generates for each number i, an iterator that produces i, i+1
print("After transformtaion: ", rdd.flatMapValues(lambda x: list(range(x, x+2))).collect())

Original RDD [(1, 2), (2, 1), (2, 2)]
After transformtaion:  [(1, 2), (1, 3), (2, 1), (2, 2), (2, 2), (2, 3)]


In [31]:
rdd1 = sc.parallelize([(1,2), (2,1), (2,2)])
rdd2 = sc.parallelize([(2,5), (3,1)])
print("rdd1=", rdd1.collect())
print("rdd2=", rdd2.collect())
print("Result:", rdd1.leftOuterJoin(rdd2).collect())

rdd1= [(1, 2), (2, 1), (2, 2)]
rdd2= [(2, 5), (3, 1)]
Result: [(1, (2, None)), (2, (1, 5)), (2, (2, 5))]


In [32]:
print("rdd1=", rdd1.collect())
print("rdd2=", rdd2.collect())
print("Result:", rdd1.join(rdd2).collect())

rdd1= [(1, 2), (2, 1), (2, 2)]
rdd2= [(2, 5), (3, 1)]
Result: [(2, (1, 5)), (2, (2, 5))]
