# 1. Import

In [1]:
from pyspark.sql import SparkSession
#SparkSession
spark=SparkSession.builder.master('local').appName('practice session 1').getOrCreate()

In [2]:
spark

# 2.RDD using parallelize() for Random/adhoc data

In [3]:
spark.sparkContext.parallelize((1,2,3,4,5,6,7,8,9,10)).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [4]:
rdd1=spark.sparkContext.parallelize([('Ross',13),('shafi',21),('Ravi',19),('Ravi',34),('shaik',123),('tommy',89),('jack',35),('splash',1.9)])

In [5]:
rdd1.take(6)

[('Ross', 13),
 ('shafi', 21),
 ('Ravi', 19),
 ('Ravi', 34),
 ('shaik', 123),
 ('tommy', 89)]

# 3. textFile() : 
<h4> Read single or multiple text, csv files and returns a single Spark RDD [String] </h4>

## Spark Read single text files into a single RDD


In [6]:
users_rdd=spark.sparkContext.textFile('S:/DataSets/text/t1.txt')

In [7]:
users_rdd.count()

1

In [8]:
users_rdd.collect()

['t1, file, 1']

## Spark Read multiple text files into a single RDD

In [9]:
users_2 = spark.sparkContext.textFile("S:/DataSets/text/t1.txt,S:/DataSets/text/t2.txt,S:/DataSets/text/t3.txt,S:/DataSets/text/t4.txt")

In [10]:
## first collect the and then loop thrugh t
users_2Collect=users_2.collect()
users_2Collect

['t1, file, 1', 't2,file,2', 't3,file,3', 't4,4,file']

In [11]:

## looping through the collect
for line in users_2Collect:
    print(line)

t1, file, 1
t2,file,2
t3,file,3
t4,4,file


## Read all text files matching a pattern to single RDD

In [12]:
sh=spark.sparkContext.textFile("S:/Datasets/text/t*.txt")
sh

S:/Datasets/text/t*.txt MapPartitionsRDD[9] at textFile at <unknown>:0

In [13]:
# sh.collect()

## with partitions

In [14]:
rdd3=spark.sparkContext.textFile("S:\\Datasets\\text\\word_list_moby.txt",minPartitions=4)

In [15]:
rdd3.count()

413

In [16]:
rdd3.take(5)

['The Project Gutenberg Etext of Moby Word II by Grady Ward',
 '',
 'Copyright laws are changing all over the world, be sure to check',
 'the laws for your country before redistributing these files!!!',
 '']

# 4. wholeTextFiles() :

<h4> Reads single or multiple files and returns a single RDD[Tuple2[String, String]], where first value (_1) in a tuple is a file name and second value (_2) is content of the file. </h4>

## Read a single text file into single RDD

In [17]:
sh=spark.sparkContext.wholeTextFiles("S:/Datasets/text/t1.txt")

In [18]:
shc=sh.collect()
shc

[('file:/S:/Datasets/text/t1.txt', 't1, file, 1')]

In [19]:
for i in shc:
    print(i)

('file:/S:/Datasets/text/t1.txt', 't1, file, 1')


## Read Multiple text files into single RDD

In [20]:
sh=spark.sparkContext.wholeTextFiles("S:/Datasets/text/t1.txt,S:/Datasets/text/t2.txt,S:/Datasets/text/t3.txt,S:/Datasets/text/t4.txt")

In [21]:
shc=sh.collect()
shc

[('file:/S:/Datasets/text/t1.txt', 't1, file, 1'),
 ('file:/S:/Datasets/text/t2.txt', 't2,file,2'),
 ('file:/S:/Datasets/text/t3.txt', 't3,file,3'),
 ('file:/S:/Datasets/text/t4.txt', 't4,4,file')]

In [22]:
for i in shc:
    print(i)

('file:/S:/Datasets/text/t1.txt', 't1, file, 1')
('file:/S:/Datasets/text/t2.txt', 't2,file,2')
('file:/S:/Datasets/text/t3.txt', 't3,file,3')
('file:/S:/Datasets/text/t4.txt', 't4,4,file')


## Read all text files with pattern match into single RDD

In [23]:
sh1=spark.sparkContext.wholeTextFiles("S:/Datasets/text/*.txt")

In [24]:
#sh1c=sh.collect()
#sh1c

In [25]:
#for i in sh1c:
#    print(i)

# 5. Create empty RDD using sparkContext.emptyRDD

In [26]:

# Creates empty RDD with no partition    
rdd = spark.sparkContext.emptyRDD 
# rddString = spark.sparkContext.emptyRDD[String]



#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10) #This creates 10 partitions


In [27]:
#rdd.getNumPartitions()

# 6. Repartition and Coalesce

Sometimes we may need to repartition the RDD, PySpark provides two ways to repartition; 
 1. first using repartition() method which shuffles data from all nodes also called full shuffle and
 2. second coalesce() method which shuffle data from minimum nodes,

for examples if  you have data in 4 partitions and doing coalesce(2) moves data from just 2 nodes.  

Both of the functions take the number of partitions to repartition rdd as shown below.  Note that repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster. 

In [28]:

reparRdd = rdd3.repartition(4)
print("re-partition count:"+str(reparRdd.getNumPartitions()))
#Outputs: "re-partition count:4


re-partition count:4


###### repartition() or coalesce() methods also returns a new RDD.

# RDD Transformations
* Transformations are lazy operations, instead of updating an RDD, these operations return another RDD.

In [29]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()




data = ("Project Gutenberg’s","Alice’s Adventures in Wonderland", "Project Gutenberg’s",
        "Adventures in Wonderland","Project Gutenberg’s")


rdd = spark.sparkContext.parallelize(data)

#Create RDD from external Data source
#rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

for element in rdd.collect():
    print(element)


Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s


In [30]:
                                               #map   
    
rdd2=rdd.map(lambda x: x.split(" "))
# for element in rdd2.collect():
#     print(element)
rdd2.collect()

[['Project', 'Gutenberg’s'],
 ['Alice’s', 'Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s'],
 ['Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s']]

In [31]:
                                                #Flatmap   
    
rdd2=rdd.flatMap(lambda x: x.split(" "))
# for element in rdd2.collect():
#     print(element)
rdd2.collect()

['Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s']

In [32]:
                                                #map
rdd3=rdd2.map(lambda x: (x,1))
for element in rdd3.collect():
    print(element)


('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


In [33]:
                                                #reduceByKey
rdd4=rdd3.reduceByKey(lambda a,b: a+b)
for element in rdd4.collect():
    print(element)

('Project', 3)
('Gutenberg’s', 3)
('Alice’s', 1)
('Adventures', 2)
('in', 2)
('Wonderland', 2)


In [34]:
                                                #sortByKey
rdd5 = rdd4.map(lambda x: (x[1],x[0])).sortByKey()
for element in rdd5.collect():
    print(element)

(1, 'Alice’s')
(2, 'Adventures')
(2, 'in')
(2, 'Wonderland')
(3, 'Project')
(3, 'Gutenberg’s')


In [35]:
                                            #filter
rdd6 = rdd5.filter(lambda x : 'a' in x[1])
for element in rdd6.collect():
    print(element)

(2, 'Wonderland')


# RDD Actions 
* operations that trigger computation and return RDD values.
* return the values from an RDD to a driver program. In other words, any RDD function that returns non-RDD is considered as an action. 

In [36]:
print("rdd6 = ",rdd6.collect())
# Action - count
print("Count : "+str(rdd6.count()))


# Action - first
firstRec = rdd6.first()
print("First Record : "+str(firstRec[0]) + ","+ firstRec[1])


# Action - max
datMax = rdd6.max()
print("Max Record : "+str(datMax[0]) + ","+ datMax[1])


# Action - reduce
totalWordCount = rdd6.reduce(lambda a,b: (a[0]+b[0],a[1]))
print("dataReduce Record : "+str(totalWordCount[0]))


# Action - take
data3 = rdd6.take(3)
for f in data3:
    print("data3 Key:"+ str(f[0]) +", Value:"+f[1])


rdd6 =  [(2, 'Wonderland')]
Count : 1
First Record : 2,Wonderland
Max Record : 2,Wonderland
dataReduce Record : 2
data3 Key:2, Value:Wonderland


In [37]:
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]
inputRDD = spark.sparkContext.parallelize(data)
  
listRdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])

listRdd.collect() 


[1, 2, 3, 4, 5, 3, 2]

In [38]:
#aggregate - Aggregate the elements of each partition, and then the results for all the partitions, 
#            using a given combine functions “combOp” and a neutral “zero value.”

seqOp = (lambda x, y: x + y) #partition aggregate function
combOp = (lambda x, y: x + y) #combine aggregate function

agg=listRdd.aggregate(0, seqOp, combOp)

print(agg) # output 20

#aggregate 2
seqOp2 = (lambda x, y: (x[0] + y, x[1] + 1))
combOp2 = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
agg2=listRdd.aggregate((0, 0), seqOp2, combOp2)

print(agg2) # output (20,7)

agg2=listRdd.treeAggregate(0,seqOp, combOp)
print(agg2) # output 20

20
(20, 7)
20


In [39]:
#fold - Aggregate the elements of each partition, and then the results for all the partitions
from operator import add
foldRes=listRdd.fold(0, add)
print(foldRes) # output 20


20


In [40]:
#reduce - Reduces the elements of the dataset using the specified binary operator.

from operator import mul,add

redRes=listRdd.reduce(add)
print('Addition : ' ,redRes) # output 20

redRes=listRdd.reduce(mul)
print('Multiplication : ',redRes) # output 720

Addition :  20
Multiplication :  720


In [41]:
#treeReduce. This is similar to reduce. Reduces the elements of this RDD in a multi-level tree pattern

add = lambda x, y: x + y
redRes=listRdd.treeReduce(add)
print(redRes) # output 20

20


In [42]:
#Collect - Return the complete dataset as an Array.
data = listRdd.collect()
print(data)

[1, 2, 3, 4, 5, 3, 2]


In [43]:
#count, countApprox, countApproxDistinct

print("Count : "+str(listRdd.count()))
#Output: Count : 20

print("countApprox : "+str(listRdd.countApprox(1200)))
#Output: countApprox : (final: [7.000, 7.000])

print("countApproxDistinct : "+str(listRdd.countApproxDistinct()))
#Output: countApproxDistinct : 5

#countByValue, countByValueApprox
print("countByValue :  "+str(listRdd.countByValue()))


Count : 7
countApprox : 7
countApproxDistinct : 5
countByValue :  defaultdict(<class 'int'>, {1: 1, 2: 2, 3: 2, 4: 1, 5: 1})


In [44]:
#first – Return the first element in the dataset.
print("first :  "+str(listRdd.first()))
#Output: first :  1
print("first :  "+str(inputRDD.first()))
#Output: first :  (Z,1)



#top– Return top n elements from the dataset.
print("top : "+str(listRdd.top(2)))
#Output: take : 5,4
print("top : "+str(inputRDD.top(2)))
#Output: take : (Z,1),(C,40)


first :  1
first :  ('Z', 1)
top : [5, 4]
top : [('Z', 1), ('C', 40)]


In [45]:
#min
print("min :  "+str(listRdd.min()))
#Output: min :  1
print("min :  "+str(inputRDD.min()))
#Output: min :  (A,20)  

#max
print("max :  "+str(listRdd.max()))
#Output: max :  5
print("max :  "+str(inputRDD.max()))
#Output: max :  (Z,1)

min :  1
min :  ('A', 20)
max :  5
max :  ('Z', 1)


In [46]:
#take, takeOrdered, takeSample
print("take : "+str(listRdd.take(2)))
#Output: take : 1,2
print("takeOrdered : "+ str(listRdd.takeOrdered(2)))
#Output: takeOrdered : 1,2


#print("take : "+str(listRdd.takeSample()))

take : [1, 2]
takeOrdered : [1, 2]


Refer this page for more info on RDD : https://sparkbyexamples.com/pyspark-rdd

# END