<h2>RDD Creation</h2>
<p>
You can create RDD by parallelizing the existing collection and reading data from a disk.</br>
<ul>
<li>
parallelizing an existing collection
</li>
<li>
referencing a dataset in an external storage system (HDFS, S3 and many more). 
</li>
</ul>
</p>
<p>
<strong>
Initialize SparkSession using the builder pattern method defined in SparkSession class.
</strong>
</p>
<p height="200" width="100%" style="background:black;font-size:20px;color:white">
# Imports</br>
from pyspark.sql import SparkSession</br>
</br>
# Create SparkSession</br>
spark = SparkSession.builder</br>
      .master("local[1]")</br>
      .appName("SparkByExamples.com")</br>
      .getOrCreate()</br>   
</p>
<p>
<strong>
master()
</strong>
If you are running it on the cluster you need to use your master name as an argument to master().</br> usually, it would be either yarn (Yet Another Resource Negotiator) or mesos depends on your cluster setup.
</p>
<p>
<strong>
local[x]:
</strong>
When operating in Standalone mode, specify ‘local[x]’, where ‘x’ is an integer greater than 0, to determine the number of partitions for RDDs.</br> Ideally, set ‘x’ to match the number of CPU cores available on your system for optimal performance.
</p>
<p>
<strong>
appName():
</strong>
Used to set your application name.
</p>
<p>
<strong>
getOrCreate():
</strong>
This returns a SparkSession object if already exists, and creates a new one if not exist.
</p>
<h2>Using sparkContext.parallelize()</h2>
<p>
<strong>
parallelize() 
</strong> function of SparkContext (sparkContext.parallelize() ) you can create an RDD. This function loads the existing collection from your driver program into parallelizing RDD.</br> This method of creating an RDD is used when you already have data in memory that is either loaded from a file or from a database. and all data must be present in the driver program prior to creating RDD.
</p>
<img width="200" height="300" src="parallelize.png">
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Create RDD from parallelize</br>  
data = [1,2,3,4,5,6,7,8,9,10,11,12]</br>
rdd = spark.sparkContext.parallelize(data)</br>
</p>
<h2>Using sparkContext.textFile()</h2>
<p>
Use the textFile() method to read a .txt file into RDD.
</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Create RDD from external Data source</br>
rdd2 = spark.sparkContext.textFile("/path/textFile.txt")</br>
</p>
<h2>Using sparkContext.wholeTextFiles()</h2>
<p>
<strong>wholeTextFiles()</strong> function returns a PairRDD with the key being the file path and the value being file content.
</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Read entire file into a RDD as single record.</br>
rdd3 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")</br>
</p>
<p>
Using <strong>emptyRDD()</strong> method on sparkContext we can create an RDD with no data. This method creates an empty RDD with no partition.
</p>
<p height="100" width="100%" style="background:black;font-size:20px;color:white">
# Create an empty RDD with no partition</br> 
rdd = spark.sparkContext.emptyRDD()</br>
</br>
# Output:</br>
# rddString = spark.sparkContext.emptyRDD[String]</br>
</p>
<h2>Creating empty RDD with partition</h2>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10) #This creates 10 partitions
</p>

<h2>PySpark RDD Operations</h2>
<p>
RDD operations are the core transformations and actions performed on RDDs
</p>
<p>
<strong>RDD transformations</strong> – Transformations are lazy operations; instead of updating an RDD, these operations return another RDD.
<strong>RDD actions</strong> – operations that trigger computation and return RDD values.
</p>
<h2>RDD Transformations</h2>
<p>
<strong>Transformations on PySpark RDD</strong> return another RDD, and transformations are lazy, meaning they don’t execute until you call an action on RDD.</br>Some transformations on RDDs are flatMap(), map(), reduceByKey(), filter(), sortByKey() and return a new RDD instead of updating the current.
</p>
<p>
Let’s do transformations to perform word count example.
</p>
<p>
Read the text file into RDD
</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Read data from text file</br>
rdd = spark.sparkContext.textFile("test.txt")</br>
</p>
<p><strong>flatMap()</strong> transformation in the RDD API flattens the resulting RDD after applying a function to each element, producing a new RDD.</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# split the data by space and flatten it.</br>
rdd2 = rdd.flatMap(lambda x: x.split(" "))</br>
</p>
<p>
<strong>map()</strong> transformation is used to perform various complex operations, such as adding or updating an element.
</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Apply the mpa() transformation</br>
# Add a new element with value 1 to each word</br>
rdd3 = rdd2.map(lambda x: (x,1))</br>
</p>
<p>
<strong>reduceByKey()</strong> combines the values associated with each key using the provided function. In our scenario, it aggregates the word strings by using the sum function on the corresponding values
</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Use reduceByKey()</br>
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)</br>
</p>

<h2>RDD Actions</h2>

<p><strong>RDD Action operations</strong> trigger the execution of transformations on RDDs (Resilient Distributed Datasets) and produce a result that can be either returned to the driver program or saved to an external storage system.
</p>
<p><strong>count</sytong> - Returns the number of records in the RDD</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Action - count</br>
print("Count : "+str(rdd6.count()))</br>
</p>
<p><strong>first()</sytong> - Returns the first record</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Action - first</br>
firstRec = rdd6.first()</br>
print("First Record : "+str(firstRec[0]) + ","+ firstRec[1])</br>
</p>
<p><strong>max()</sytong> - Returns max record.</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Action - max</br>
datMax = rdd6.max()</br>
print("Max Record : "+str(datMax[0]) + ","+ datMax[1])</br>
</p>
<p><strong>reduce()</sytong> – Reduces the records to single, we can use this to count or sum.</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Action - reduce</br>
totalWordCount = rdd6.reduce(lambda a,b: (a[0]+b[0],a[1]))</br>
print("dataReduce Record : "+str(totalWordCount[0]))</br>
</p>
<p><strong>take()</sytong> – Returns the record specified as an argument.</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
# Action - take</br>
data3 = rdd6.take(3)</br>
for f in data3:</br>
    print("data3 Key:"+ str(f[0]) +", Value:"+f[1])</br>
</p>
<p><strong>collect()</sytong>  – Returns all data from RDD as an array. Be careful when you use this action when you are working with huge RDD with millions and billions of data as you may run out of memory on the driver.</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
</br>
# Action - collect</br>
data = rdd6.collect()</br>
for f in data:</br>
    print("Key:"+ str(f[0]) +", Value:"+f[1])</br>
</p>
<p><strong>saveAsTextFile()</sytong> – Using saveAsTestFile action, we can write the RDD to a text file.</p>
<p height="50" width="100%" style="background:black;font-size:20px;color:white">
</br>
# Action - collect</br>
data = rdd6.collect()</br>
for f in data:</br>
    print("Key:"+ str(f[0]) +", Value:"+f[1])</br>
</p>