In [1]:
## RDD creation
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("RDD_creation").getOrCreate()


Note: Creating SparkSession object, internally creates one SparkContext per JVM.

In [2]:
## Creating RDD by using Parallelize
data = [1,2,3,4,5,6,7,8,9,10]
rdd = spark.sparkContext.parallelize(data)
print(rdd.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


In [3]:
spark.sparkContext.uiWebUrl

'http://Rangapavan:4040'

In [4]:
## read data from text file
rdd2 = spark.sparkContext.textFile("test.txt")
print(rdd2.collect())

['Project Gutenberg’s', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Project Gutenberg’s', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Project Gutenberg’s', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'T

In [5]:
## creating an empty rdd
rdd3 = spark.sparkContext.emptyRDD()
print(rdd3)

EmptyRDD[3] at emptyRDD at DirectMethodHandleAccessor.java:103


In [6]:
## creating an empty rdd with partition
rdd4 = spark.sparkContext.parallelize([],10) # this creates 10 partitions
print("number of partitions:" , rdd4.getNumPartitions()) # this method returns the number of partitions

number of partitions: 10


In [7]:
## initial partititon
print("initial number of partitions:" , rdd.getNumPartitions())

initial number of partitions: 1


In [8]:
## set partitions manually
spark.sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3], 10)

ParallelCollectionRDD[5] at readRDDFromFile at PythonRDD.scala:289

In [9]:
## repartition the RDD
repartitionRDD = rdd.repartition(5)
print("number of partitions:" , repartitionRDD.getNumPartitions())

number of partitions: 5


In [10]:
print(rdd2.collect())

['Project Gutenberg’s', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Project Gutenberg’s', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Project Gutenberg’s', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'Alice’s Adventures in Wonderland', 'by Lewis Carroll', 'This eBook is for the use', 'of anyone anywhere', 'at no cost and with', 'T

## Transformation operations

In [11]:
## flatMap method
rdd5 = rdd2.flatMap(lambda x:x.split())
print(rdd5)


PythonRDD[10] at RDD at PythonRDD.scala:53


In [12]:
print(rdd5.collect())

['Project', 'Gutenberg’s', 'Alice’s', 'Adventures', 'in', 'Wonderland', 'by', 'Lewis', 'Carroll', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'Alice’s', 'Adventures', 'in', 'Wonderland', 'by', 'Lewis', 'Carroll', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'Project', 'Gutenberg’s', 'Alice’s', 'Adventures', 'in', 'Wonderland', 'by', 'Lewis', 'Carroll', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'Alice’s', 'Adventures', 'in', 'Wonderland', 'by', 'Lewis', 'Carroll', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'Project', 'Gutenberg

In [13]:

# Apply the mpa() transformation 
# Add a new element with value 1 to each word
rdd6 = rdd5.map(lambda x: (x,1))
print(rdd6.collect())

[('Project', 1), ('Gutenberg’s', 1), ('Alice’s', 1), ('Adventures', 1), ('in', 1), ('Wonderland', 1), ('by', 1), ('Lewis', 1), ('Carroll', 1), ('This', 1), ('eBook', 1), ('is', 1), ('for', 1), ('the', 1), ('use', 1), ('of', 1), ('anyone', 1), ('anywhere', 1), ('at', 1), ('no', 1), ('cost', 1), ('and', 1), ('with', 1), ('Alice’s', 1), ('Adventures', 1), ('in', 1), ('Wonderland', 1), ('by', 1), ('Lewis', 1), ('Carroll', 1), ('This', 1), ('eBook', 1), ('is', 1), ('for', 1), ('the', 1), ('use', 1), ('of', 1), ('anyone', 1), ('anywhere', 1), ('at', 1), ('no', 1), ('cost', 1), ('and', 1), ('with', 1), ('This', 1), ('eBook', 1), ('is', 1), ('for', 1), ('the', 1), ('use', 1), ('of', 1), ('anyone', 1), ('anywhere', 1), ('at', 1), ('no', 1), ('cost', 1), ('and', 1), ('with', 1), ('Project', 1), ('Gutenberg’s', 1), ('Alice’s', 1), ('Adventures', 1), ('in', 1), ('Wonderland', 1), ('by', 1), ('Lewis', 1), ('Carroll', 1), ('This', 1), ('eBook', 1), ('is', 1), ('for', 1), ('the', 1), ('use', 1), ('of

In [14]:
## reducedByKey method
rdd7 = rdd6.reduceByKey(lambda a,b: a+b)
print(rdd7)

PythonRDD[16] at RDD at PythonRDD.scala:53


In [15]:
print(rdd7.collect())

[('Project', 9), ('Gutenberg’s', 9), ('Alice’s', 18), ('Adventures', 18), ('in', 18), ('Wonderland', 18), ('by', 18), ('Lewis', 18), ('Carroll', 18), ('This', 27), ('eBook', 27), ('is', 27), ('for', 27), ('the', 27), ('use', 27), ('of', 27), ('anyone', 27), ('anywhere', 27), ('at', 27), ('no', 27), ('cost', 27), ('and', 27), ('with', 27)]


In [16]:
## sortByKey
rdd8 = rdd7.map(lambda x:(x[1],x[0])).sortByKey()
print(rdd8.collect())

[(9, 'Project'), (9, 'Gutenberg’s'), (18, 'Alice’s'), (18, 'Adventures'), (18, 'in'), (18, 'Wonderland'), (18, 'by'), (18, 'Lewis'), (18, 'Carroll'), (27, 'This'), (27, 'eBook'), (27, 'is'), (27, 'for'), (27, 'the'), (27, 'use'), (27, 'of'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'no'), (27, 'cost'), (27, 'and'), (27, 'with')]


## RDD Actions

In [17]:
## count() – Returns the number of records in an RDD
# Action - count
print("Count : "+str(rdd8.count()))

Count : 23


In [18]:
## first() – Returns the first record.
# Action - first
firstRec = rdd8.first()
print(firstRec)
print("First Record : "+str(firstRec[0]) + ","+ firstRec[1])

(9, 'Project')
First Record : 9,Project


In [19]:
## max() – Returns max record.
# Action - max
datMax = rdd8.max()
print(datMax)
print("Max Record : "+str(datMax[0]) + ","+ str(datMax[1]))

(27, 'with')
Max Record : 27,with


In [20]:
## reduce() – Reduces the records to single, we can use this to count or sum.


# Action - reduce
totalWordCount = rdd8.reduce(lambda a,b: (a[0]+b[0],a[1]))
print("dataReduce Record : "+str(totalWordCount[0]))

dataReduce Record : 522


In [21]:
## take() – Returns the record specified as an argument.
# Action - take
data3 = rdd8.take(3)
for f in data3:
    print("data3 Key:"+ str(f[0]) +", Value:"+f[1])

data3 Key:9, Value:Project
data3 Key:9, Value:Gutenberg’s
data3 Key:18, Value:Alice’s


In [22]:
## collect() – Returns all data from RDD as an array.
## Be careful when you use this action when you are working with huge RDD with millions and billions
## of data as you may run out of memory on the driver.


# Action - collect
data = rdd8.collect()
for f in data:
    print("Key:"+ str(f[0]) +", Value:"+f[1])

Key:9, Value:Project
Key:9, Value:Gutenberg’s
Key:18, Value:Alice’s
Key:18, Value:Adventures
Key:18, Value:in
Key:18, Value:Wonderland
Key:18, Value:by
Key:18, Value:Lewis
Key:18, Value:Carroll
Key:27, Value:This
Key:27, Value:eBook
Key:27, Value:is
Key:27, Value:for
Key:27, Value:the
Key:27, Value:use
Key:27, Value:of
Key:27, Value:anyone
Key:27, Value:anywhere
Key:27, Value:at
Key:27, Value:no
Key:27, Value:cost
Key:27, Value:and
Key:27, Value:with


In [23]:
## saveAsTextFile() – Using saveAsTestFile action, we can write the RDD to a text file.


rdd8.saveAsTextFile("/wordCount")

Py4JJavaError: An error occurred while calling o158.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/wordCount already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:299)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1623)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1623)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1609)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1609)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
