In [None]:
Question 1. Working with RDDs:
   a) Write a Python program to create an RDD from a local data source.
   b) Implement transformations and actions on the RDD to perform data processing tasks.
   c) Analyze and manipulate data using RDD operations such as map, filter, reduce, or aggregate.


Solution:-
    
a) Creating RDDs from Local Collections:
One of the simplest ways to create an RDD is from a local collection in memory. Spark provides the parallelize method, which takes a collection and distributes it across the cluster. Let's consider an example where we have a list of numbers and want to create an RDD from it:

import org.apache.spark.{SparkConf, SparkContext}

// Create a SparkContext
val conf = new SparkConf().setAppName("RDD Creation Example").setMaster("local")
val sc = new SparkContext(conf)

// Create an RDD from a local collection
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)

// Perform operations on the RDD
// For example, let's calculate the sum of the numbers
val sumOfNumbers = rdd.sum()
println("Sum of numbers: " + sumOfNumbers)

// Stop the SparkContext
sc.stop()


```python
from pyspark import SparkContext

# Create a SparkContext
sc = SparkContext("local", "RDD Creation Example")

# Create an RDD from a local collection
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Perform operations on the RDD
# For example, let's calculate the sum of the numbers
sum_of_numbers = rdd.sum()
print("Sum of numbers:", sum_of_numbers)

# Stop the SparkContext
sc.stop()
Creating RDDs from Files:
Spark supports reading data from various file formats such as text files, CSV files, JSON files, and more. Let’s consider an example where we have a text file containing customer information, and we want to create an RDD from it:

import org.apache.spark.{SparkConf, SparkContext}

// Create a SparkContext
val conf = new SparkConf().setAppName("RDD Creation Example").setMaster("local")
val sc = new SparkContext(conf)

// Create an RDD from a text file
val rdd = sc.textFile("customer_data.txt")

// Perform operations on the RDD
// For example, let's count the number of lines
val lineCount = rdd.count()
println("Number of lines: " + lineCount)

// Stop the SparkContext
sc.stop()


```python
from pyspark import SparkContext

# Create a SparkContext
sc = SparkContext("local", "RDD Creation Example")

# Create an RDD from a text file
rdd = sc.textFile("customer_data.txt")

# Perform operations on the RDD
# For example, let's count the number of lines
line_count = rdd.count()
print("Number of lines:", line_count)

# Stop the SparkContext
sc.stop()

In [None]:
 b) Implement transformations and actions on the RDD to perform data processing tasks.
    
Solution:- Spark RDD Operations-Transformation & Action with Example

1. Spark RDD Operations
Two types of Apache Spark RDD operations are- Transformations and Actions. 
A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed. When the action is triggered after the result,
new RDD is not formed like transformation. 
In this Apache Spark RDD operations tutorial we will get the detailed view of what is Spark RDD,
what is the transformation in Spark RDD, various RDD transformation operations in Spark with examples, 
what is action in Spark RDD and various RDD action operations in Spark with examples.

2. Apache Spark RDD Operations
Before we start with Spark RDD Operations, let us deep dive into RDD in Spark.
Apache Spark RDD supports two types of Operations-

Transformations
Actions
Now let us understand first what is Spark RDD Transformation and Action-

3. RDD Transformation
Spark Transformation is a function that produces new RDD from the existing RDDs. It takes RDD as input and produces one or more RDD as output. Each time it creates new RDD when we apply any transformation. Thus, the so input RDDs, cannot be changed since RDD are immutable in nature.

Applying transformation built an RDD lineage, with the entire parent RDDs of the final RDD(s). RDD lineage, also known as RDD operator graph or RDD dependency graph. It is a logical execution plan i.e., it is Directed Acyclic Graph (DAG) of the entire parent RDDs of RDD.

Transformations are lazy in nature i.e., they get execute when we call an action. They are not executed immediately. Two most basic type of transformations is a map(), filter().
After the transformation, the resultant RDD is always different from its parent RDD. It can be smaller (e.g. filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same size (e.g. map).

There are two types of transformations:

Narrow transformation – In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map(), filter().

Wide transformation – In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. The partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey() and reducebyKey().

There are various functions in RDD transformation. Let us see RDD transformation with examples.

3.1. map(func)
The map function iterates over every line in RDD and split into new RDD. Using map() transformation we take in any function, and that function is applied to every element of RDD.

In the map, we have the flexibility that the input and the return type of RDD may differ from each other. For example, we can have input RDD type as String, after applying the

map() function the return RDD can be Boolean.

For example, in RDD {1, 2, 3, 4, 5} if we apply “rdd.map(x=>x+2)” we will get the result as (3, 4, 5, 6, 7).

Also Read: How to create RDD

Map() example:

[php]import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object  mapTest{
def main(args: Array[String]) = {
val spark = SparkSession.builder.appName(“mapExample”).master(“local”).getOrCreate()
val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.map(line => (line,line.length))
mapFile.foreach(println)
}
}[/php]

spark_test.txt”

hello...user! this file is created to check the operations of spark.
?, and how can we apply functions on that RDD partitions?. All this will be done through spark programming which is done with the help of scala language support…
Note – In above code, map() function map each line of the file with its length.
3.2. flatMap()
With the help of flatMap() function, to each input element, we have many elements in an output RDD. The most simple use of flatMap() is to split each input string into words.
Map and flatMap are similar in the way that they take a line from input RDD and apply a function on that line. The key difference between map() and flatMap() is map() returns only one element, while flatMap() can return a list of elements.

flatMap() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val flatmapFile = data.flatMap(lines => lines.split(” “))
flatmapFile.foreach(println)[/php]

Note – In above code, flatMap() function splits each line when space occurs.
3.3. filter(func)
Spark RDD filter() function returns a new RDD, containing only the elements that meet a predicate. It is a narrow operation because it does not shuffle data from one partition to many partitions.

For example, Suppose RDD contains first five natural numbers (1, 2, 3, 4, and 5) and the predicate is check for an even number. The resulting RDD after the filter will contain only the even numbers i.e., 2 and 4.

Filter() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.flatMap(lines => lines.split(” “)).filter(value => value==”spark”)
println(mapFile.count())[/php]

Note – In above code, flatMap function map line into words and then count the word “Spark” using count() Action after filtering lines containing “Spark” from mapFile.
Read: Apache Spark RDD vs DataFrame vs DataSet

3.4. mapPartitions(func)
The MapPartition converts each partition of the source RDD into many elements of the result (possibly none). In mapPartition(), the map() function is applied on each partitions simultaneously. MapPartition is like a map, but the difference is it runs separately on each partition(block) of the RDD.

3.5. mapPartitionWithIndex()
It is like mapPartition; Besides mapPartition it provides func with an integer value representing the index of the partition, and the map() is applied on partition index wise one after the other.

Learn: Spark Shell Commands to Interact with Spark-Scala

3.6. union(dataset)
With the union() function, we get the elements of both the RDD in new RDD. The key rule of this function is that the two RDDs should be of the same type.
For example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are (Big data, Spark, Flink) so the resultant rdd1.union(rdd2) will have elements (Spark, Spark, Spark, Hadoop, Flink, Flink, Big data).

Union() example:

[php]val rdd1 = spark.sparkContext.parallelize(Seq((1,”jan”,2016),(3,”nov”,2014),(16,”feb”,2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,”dec”,2014),(17,”sep”,2015)))
val rdd3 = spark.sparkContext.parallelize(Seq((6,”dec”,2011),(16,”may”,2015)))
val rddUnion = rdd1.union(rdd2).union(rdd3)
rddUnion.foreach(Println)[/php]

Note – In above code union() operation will return a new dataset that contains the union of the elements in the source dataset (rdd1) and the argument (rdd2 & rdd3).
3.7. intersection(other-dataset)
With the intersection() function, we get only the common element of both the RDD in new RDD. The key rule of this function is that the two RDDs should be of the same type.
Consider an example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are (Big data, Spark, Flink) so the resultant rdd1.intersection(rdd2) will have elements (spark).

Intersection() example:

[php]val rdd1 = spark.sparkContext.parallelize(Seq((1,”jan”,2016),(3,”nov”,2014, (16,”feb”,2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,”dec”,2014),(1,”jan”,2016)))
val comman = rdd1.intersection(rdd2)
comman.foreach(Println)[/php]

Note – The intersection() operation return a new RDD. It contains the intersection of elements in the rdd1 & rdd2.
Learn to Install Spark on Ubuntu

3.8. distinct()
It returns a new dataset that contains the distinct elements of the source dataset. It is helpful to remove duplicate data.
For example, if RDD has elements (Spark, Spark, Hadoop, Flink), then rdd.distinct() will give elements (Spark, Hadoop, Flink).

Distinct() example:

[php]val rdd1 = park.sparkContext.parallelize(Seq((1,”jan”,2016),(3,”nov”,2014),(16,”feb”,2014),(3,”nov”,2014)))
val result = rdd1.distinct()
println(result.collect().mkString(“, “))[/php]

Note – In the above example, the distinct function will remove the duplicate record i.e. (3,'”nov”,2014).
3.9. groupByKey()
When we use groupByKey() on a dataset of (K, V) pairs, the data is shuffled according to the key value K in another RDD. In this transformation, lots of unnecessary data get to transfer over the network.

Spark provides the provision to save data to disk when there is more data shuffled onto a single executor machine than can fit in memory. Follow this link to learn about RDD Caching and Persistence mechanism in detail.

groupByKey() example:

[php]val data = spark.sparkContext.parallelize(Array((‘k’,5),(‘s’,3),(‘s’,4),(‘p’,7),(‘p’,5),(‘t’,8),(‘k’,6)),3)
val group = data.groupByKey().collect()
group.foreach(println)[/php]

Note – The groupByKey() will group the integers on the basis of same key(alphabet). After that collect() action will return all the elements of the dataset as an Array.
3.10. reduceByKey(func, [numTasks])
When we use reduceByKey on a dataset (K, V), the pairs on the same machine with the same key are combined, before the data is shuffled.

reduceByKey() example:

[php]val words = Array(“one”,”two”,”two”,”four”,”five”,”six”,”six”,”eight”,”nine”,”ten”)
val data = spark.sparkContext.parallelize(words).map(w => (w,1)).reduceByKey(_+_)
data.foreach(println)[/php]

Note – The above code will parallelize the Array of String. It will then map each word with count 1, then reduceByKey will merge the count of values having the similar key.
Read: Various Features of RDD

3.11. sortByKey()
When we apply the sortByKey() function on a dataset of (K, V) pairs, the data is sorted according to the key K in another RDD.

sortByKey() example:

[php] val data = spark.sparkContext.parallelize(Seq((“maths”,52), (“english”,75), (“science”,82), (“computer”,65), (“maths”,85)))
val sorted = data.sortByKey()
sorted.foreach(println)[/php]

Note – In above code, sortByKey() transformation sort the data RDD into Ascending order of the Key(String).
Read: Limitations of RDD

3.12. join()
The Join is database term. It combines the fields from two table using common values. join() operation in Spark is defined on pair-wise RDD. Pair-wise RDDs are RDD in which each element is in the form of tuples. Where the first element is key and the second element is the value.

The boon of using keyed data is that we can combine the data together. The join() operation combines two data sets on the basis of the key.

Join() example:

[php]val data = spark.sparkContext.parallelize(Array((‘A’,1),(‘b’,2),(‘c’,3)))
val data2 =spark.sparkContext.parallelize(Array((‘A’,4),(‘A’,6),(‘b’,7),(‘c’,3),(‘c’,8)))
val result = data.join(data2)
println(result.collect().mkString(“,”))[/php]

Note –  The join() transformation will join two different RDDs on the basis of Key.
Read: RDD lineage in Spark: ToDebugString Method

3.13. coalesce()
To avoid full shuffling of data we use coalesce() function. In coalesce() we use existing partition so that less data is shuffled. Using this we can cut the number of the partition. Suppose, we have four nodes and we want only two nodes. Then the data of extra nodes will be kept onto nodes which we kept.

Coalesce() example:

[php]val rdd1 = spark.sparkContext.parallelize(Array(“jan”,”feb”,”mar”,”april”,”may”,”jun”),3)
val result = rdd1.coalesce(2)
result.foreach(println)[/php]

Note – The coalesce will decrease the number of partitions of the source RDD to numPartitions define in coalesce argument.
4. RDD Action
Transformations create RDDs from each other, but when we want to work with the actual dataset, at that point action is performed. When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion.

An action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task. Some of the actions of Spark are:

4.1. count()
Action count() returns the number of elements in RDD.

For example, RDD has values {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “rdd.count()” will give the result 8.

Count() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.flatMap(lines => lines.split(” “)).filter(value => value==”spark”)
println(mapFile.count())[/php]

Note – In above code flatMap() function maps line into words and count the word “Spark” using count() Action after filtering lines containing “Spark” from mapFile.
Learn: Spark Streaming

4.2. collect()
The action collect() is the common and simplest operation that returns our entire RDDs content to driver program. The application of collect() is unit testing where the entire RDD is expected to fit in memory. As a result, it makes easy to compare the result of RDD with the expected result.
Action Collect() had a constraint that all the data should fit in the machine, and copies to the driver.

Collect() example:

[php]val data = spark.sparkContext.parallelize(Array((‘A’,1),(‘b’,2),(‘c’,3)))
val data2 =spark.sparkContext.parallelize(Array((‘A’,4),(‘A’,6),(‘b’,7),(‘c’,3),(‘c’,8)))
val result = data.join(data2)
println(result.collect().mkString(“,”))[/php]

Note – join() transformation in above code will join two RDDs on the basis of same key(alphabet). After that collect() action will return all the elements to the dataset as an Array.
4.3. take(n)
The action take(n) returns n number of elements from RDD. It tries to cut the number of partition it accesses, so it represents a biased collection. We cannot presume the order of the elements.

For example, consider RDD {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “take (4)” will give result { 2, 2, 3, 4}

Take() example:

[php]val data = spark.sparkContext.parallelize(Array((‘k’,5),(‘s’,3),(‘s’,4),(‘p’,7),(‘p’,5),(‘t’,8),(‘k’,6)),3)

val group = data.groupByKey().collect()

val twoRec = result.take(2)

twoRec.foreach(println)[/php]

Note – The take(2) Action will return an array with the first n elements of the data set defined in the taking argument.
Learn: Apache Spark DStream (Discretized Streams)

4.4. top()
If ordering is present in our RDD, then we can extract top elements from our RDD using top(). Action top() use default ordering of data.

Top() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.map(line => (line,line.length))
val res = mapFile.top(3)
res.foreach(println)[/php]

Note – map() operation will map each line with its length. And top(3) will return 3 records from mapFile with default ordering.
4.5. countByValue()
The countByValue() returns, many times each element occur in RDD.

For example, RDD has values {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “rdd.countByValue()”  will give the result {(1,1), (2,2), (3,1), (4,1), (5,2), (6,1)}

countByValue() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val result= data.map(line => (line,line.length)).countByValue()
result.foreach(println)[/php]

Note – The countByValue() action will return a hashmap of (K, Int) pairs with the count of each key.
Learn: Apache Spark Streaming Transformation Operations

4.6. reduce()
The reduce() function takes the two elements as input from the RDD and then produces the output of the same type as that of the input elements. The simple forms of such function are an addition. We can add the elements of RDD, count the number of words. It accepts commutative and associative operations as an argument.

Reduce() example:

[php]val rdd1 = spark.sparkContext.parallelize(List(20,32,45,62,8,5))
val sum = rdd1.reduce(_+_)
println(sum)[/php]

Note – The reduce() action in above code will add the elements of the source RDD.
4.7. fold()
The signature of the fold() is like reduce(). Besides, it takes “zero value” as input, which is used for the initial call on each partition. But, the condition with zero value is that it should be the identity element of that operation. The key difference between fold() and reduce() is that, reduce() throws an exception for empty collection, but fold() is defined for empty collection.

For example, zero is an identity for addition; one is identity element for multiplication. The return type of fold() is same as that of the element of RDD we are operating on.
For example, rdd.fold(0)((x, y) => x + y).

Fold() example:

[php]val rdd1 = spark.sparkContext.parallelize(List((“maths”, 80),(“science”, 90)))
val additionalMarks = (“extra”, 4)
val sum = rdd1.fold(additionalMarks){ (acc, marks) => val add = acc._2 + marks._2
(“total”, add)
}
println(sum)[/php]

Note – In above code additionalMarks is an initial value. This value will be added to the int value of each record in the source RDD.
Learn: Spark Streaming Checkpoint in Apache Spark

4.8. aggregate()
It gives us the flexibility to get data type different from the input type. The aggregate() takes two functions to get the final result. Through one function we combine the element from our RDD with the accumulator, and the second, to combine the accumulator. Hence, in aggregate, we supply the initial zero value of the type which we want to return.

4.9. foreach()
When we have a situation where we want to apply operation on each element of RDD, but it should not return value to the driver. In this case, foreach() function is useful. For example, inserting a record into the database.

Foreach() example:

[php]val data = spark.sparkContext.parallelize(Array((‘k’,5),(‘s’,3),(‘s’,4),(‘p’,7),(‘p’,5),(‘t’,8),(‘k’,6)),3)
val group = data.groupByKey().collect()
group.foreach(println)[/php]

Note – The foreach() action run a function (println) on each element of the dataset group.
5. Conclusion
In conclusion, on applying a transformation to an RDD creates another RDD. As a result of this RDDs are immutable in nature. 
On the introduction of an action on an RDD, the result gets computed. 
Thus, this lazy evaluation decreases the overhead of computation and make the system more efficient.
                                                                                          

In [None]:
c) Analyze and manipulate data using RDD operations such as map, filter, reduce, or aggregate.
Solution:-  Map()-> The map transformation is the most commonly used and the simplest of transformations on an RDD. The map transformation applies the function passed in the arguments to each of the elements of the source RDD. In the previous examples, 
we have seen the usage of map() transformation where we have passed the split() function to the input RDD.

Example:- 
val data = Seq("Project Gutenberg’s",
    "Alice’s Adventures in Wonderland",
    "Project Gutenberg’s",
    "Adventures in Wonderland",
    "Project Gutenberg’s")

import spark.sqlContext.implicits._
val df = data.toDF("data")
df.show()

//Output
+--------------------+
|                data|
+--------------------+
| Project Gutenberg’s|
|Alice’s Adventure...|
| Project Gutenberg’s|
|Adventures in Won...|
| Project Gutenberg’s|
+--------------------+


//Map Transformation
val mapDF=df.map(fun=> {
    fun.getString(0).split(" ")
})
mapDF.show()

//Output
+-------------------------------------+
|value                                |
+-------------------------------------+
|[Project, Gutenberg’s]               |
|[Alice’s, Adventures, in, Wonderland]|
|[Project, Gutenberg’s]               |
|[Adventures, in, Wonderland]         |
|[Project, Gutenberg’s]               |
+-------------------------------------+

Filter()-> Filter, as the name implies, filters the input RDD, and creates a new dataset that satisfies the predicate passed as arguments.

Spark RDD filter() function returns a new RDD, containing only the elements that meet a predicate. It is a narrow operation because it does not shuffle data from one partition to many partitions.

For example, Suppose RDD contains first five natural numbers (1, 2, 3, 4, and 5) and the predicate is check for an even number. The resulting RDD after the filter will contain only the even numbers i.e., 2 and 4.

Filter() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.flatMap(lines => lines.split(” “)).filter(value => value==”spark”)
println(mapFile.count())[/php]

Note – In above code, flatMap function map line into words and then count the word “Spark” using count() Action after filtering lines containing “Spark” from mapFile.

Reduce()
The reduce() function takes the two elements as input from the RDD and then produces the output of the same type as that of the input elements. The simple forms of such function are an addition. We can add the elements of RDD, count the number of words. It accepts commutative and associative operations as an argument.

Reduce() example:

[php]val rdd1 = spark.sparkContext.parallelize(List(20,32,45,62,8,5))
val sum = rdd1.reduce(_+_)
println(sum)[/php]


Aggregate()
It gives us the flexibility to get data type different from the input type. The aggregate() takes two functions to get the final result. Through one function we combine the element from our RDD with the accumulator, and the second, to combine the accumulator. 
Hence, in aggregate, we supply the initial zero value of the type which we want to return.

Example:-  val listRdd = spark.sparkContext.parallelize(List(1,2,3,4,5,3,2))
  def param0= (accu:Int, v:Int) => accu + v
  def param1= (accu1:Int,accu2:Int) => accu1 + accu2
  val result = listRdd.aggregate(0)(param0,param1)
  println("output 1 =>" + result)

In this snippet, RDD type is Int and it returns the result of Int type.

In [None]:
Question 2. Spark DataFrame Operations:
a) Write a Python program to load a CSV file into a Spark DataFrame.

Solution:- Loading a simple CSV to Dataframe is very easy in Spark. But it gets messy when raw data has new line characters in between.

Take a look at the sample data. The first row has an additional newline character after the word “Rachel green”.

id,name,description,status
1,rachel,"rachel green 
started her career at central perk",true
2,joey,"joey tribainni's fav line is, how you doing?",true
When loaded to a data frame it looks like this

%python
file_location = "/FileStore/tables/multilinetext_csv.bz2"

df = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load(file_location)
display(df)

Once the problem is identified, the fix is very simple. Add one more parameter to “multiLine”, “true”.

Mentioning the quote character is purely optional.

%python

df = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .option("multiLine", "true") \
  .option("quote","\"") \
  .load(file_location)
display(df)

  


In [None]:
 b)Perform common DataFrame operations such as filtering, grouping, or joining.
    
Solution:-  Spark DataFrame Operations
Some of the basic and frequently used spark dataframe operations would be discussed below.

Before we start, let’s create our SparkSession and sparkContext. (Note: These parameters are automatically created if you’re accessing spark via spark shell)

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark Training').getOrCreate()
sc = spark.sparkContext
Create Spark DataFrame
Below are some of the methods to create a pyspark dataframe.

Creating Spark Dataframe from CSV File using spark.read.csv method.
For this example, a countrywise population by year dataset is chosen. The dataset can be downloaded here, population_dataset

df = spark.read.format('csv').options(delimiter=',', header=True).load('/Path-to-file/population.csv')
Convert RDD to Dataframe.
Below method shows how to create DataFrame from RDD. The toDF() method can be used to convert the RDD to a dataframe

rdd = sc.parallelize([(1,2),(3,4),(5,6)])
rdf = rdd.toDF()
Using spark.createDataFrame method.
Creating a DataFrame from a list of values. Schema is inferred dynamically, if not specified.

tdf = spark.createDataFrame([('Alice',24),('David',43)],['name','age'])
tdf.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



Spark DataFrame Schema
The df.printSchema() method can be used to display the schema of spark dataframe

df.printSchema()
root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Value: string (nullable = true)
To obtain the raw schema of a dataframe, the df.schema method can be used.

df.schema
StructType(List(StructField(Country Name,StringType,true),StructField(Country Code,StringType,true),StructField(Year,LongType,true),StructField(Value,LongType,true)))
To display the columns of dataframe, df.columns method can be used. A list consisting of the columns is generated.

df.columns
['Country Name', 'Country Code', 'Year', 'Value']



Count of a Spark DataFrame
df.count()
14885



Display DataFrame Data
Display 5 rows. Truncate=False can be enabled for displaying entire column data on your terminal

df.show(5, truncate=False)
+------------+------------+----+---------+
|Country Name|Country Code|Year|Value    |
+------------+------------+----+---------+
|Arab World  |ARB         |1960|92490932 |
|Arab World  |ARB         |1961|95044497 |
|Arab World  |ARB         |1962|97682294 |
|Arab World  |ARB         |1963|100411076|
|Arab World  |ARB         |1964|103239902|
+------------+------------+----+---------+
only showing top 5 rows



Remove Duplicate rows from a DataFrame
df.dropDuplicates() can be used to remove duplicates from a spark dataframe.

df.dropDuplicates()
DataFrame[Country Name: string, Country Code: string, Year: bigint, Value: bigint]



Distinct Column Values
To display distinct rows of a dataframe, df.distinct() can be used. For our example let’s select distinct country code from the dataset

df.select('Country Code').distinct().show()
+------------+
|Country Code|
+------------+
|         HTI|
|         PSE|
|         LTE|
|         BRB|
|         LVA|
|         POL|
|         ECS|
|         TEA|
|         JAM|
|         ZMB|
|         MIC|
|         BRA|
|         ARM|
|         IDA|
|         MOZ|
|         CUB|
|         JOR|
|         OSS|
|         ABW|
|         FRA|
+------------+
only showing top 20 rows



Spark Filter Data
df.filter() method can be used to filter in pyspark. In our example, let’s display population of India for the year 2015,2016 & - and

| - or

Note: Remember to wrap the conditions with braces when ‘&’ or ‘|’ is used.

from pyspark.sql.functions import col
df.filter((col('Country Name') == 'India') & (col('Year').isin('2015','2016'))).show()
+------------+------------+----+----------+
|Country Name|Country Code|Year|     Value|
+------------+------------+----+----------+
|       India|         IND|2015|1309053980|
|       India|         IND|2016|1324171354|
+------------+------------+----+----------+



Sorting/Ordering Data in Spark
Below example illustrates how the country names can be displayed in descending order

df.select('Country Name').orderBy('Country Name', ascending=False).distinct().show(truncate=False)
+------------------------+
|Country Name            |
+------------------------+
|Zimbabwe                |
|Zambia                  |
|Yemen, Rep.             |
|World                   |
|West Bank and Gaza      |
|Virgin Islands (U.S.)   |
|Vietnam                 |
|Venezuela, RB           |
|Vanuatu                 |
|Uzbekistan              |
|Uruguay                 |
|Upper middle income     |
|United States           |
|United Kingdom          |
|United Arab Emirates    |
|Ukraine                 |
|Uganda                  |
|Tuvalu                  |
|Turks and Caicos Islands|
|Turkmenistan            |
+------------------------+
only showing top 20 rows



Grouping & Performing Aggregations in a Spark Dataframe
Obtain the total count of distinct years present in the entire dataset
# Count of Years
df.select('Year').distinct().groupBy().count().show()
+-----+
|count|
+-----+
|   57|
+-----+


Calculate sum of dataframe – Compute the total world population for the year 1990
df.filter(col('Year') == '1990').agg({'Value':'sum'}).show(truncate=False)
+---------------+
|sum(Value)     |
+---------------+
|5.4935613753E10|
+---------------+

Calculate average of dataframe – Compute the average population in India for the year 2005
df.filter((col('Year') == '2005') & (col('Country Name') == 'India')).agg({'Value':'avg'}).show(truncate=False)
+-------------+
|avg(Value)   |
+-------------+
|1.144118674E9|
+-------------+

Computing Minimum of a column in dataframe – Display the least population for the year 2010
df.filter(df.Year == '2007').agg({'Value':'min'}).show()
+----------+
|min(Value)|
+----------+
|     10075|
+----------+

Computing Maximum of a column in dataframe – Display country with the largest population for the year 2016
df.filter(df.Value == df.filter(df.Year == '2016').agg({'Value':'max'}).collect()[0][0]).show()
+------------+------------+----+----------+
|Country Name|Country Code|Year|     Value|
+------------+------------+----+----------+
|       World|         WLD|2016|7442135578|
+------------+------------+----+----------+



Spark Join DataFrames
A pyspark dataframe can be joined with another using the df.join method. df.join takes 3 arguments, join(other, on=None, how=None)
other - dataframe to be joined with
on - on condition of the join
how - type of join. inner join is set by default if not specified
Other types of joins which can be specified are, inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti

Below is an example illustrating an inner join in pyspark
Let’s construct 2 dataframes,
One with only distinct values of country name and country code and the other with country code, value and year
Country code would be the join condition here

df1 = df.select('Country Name', 'Country Code').distinct()
df1.count()
263
df2 = df.select(col('Country Code').alias('ctry_cd'), 'Value', 'Year').distinct()
df2.count()
14885

Now let’s join both the dataframes on country_code and display the data

from pyspark.sql.functions import col
df1.join(df2, col('Country Code') == col('ctry_cd')).show(5)
+--------------------+------------+-------+----------+----+
|        Country Name|Country Code|ctry_cd|     Value|Year|
+--------------------+------------+-------+----------+----+
|East Asia & Pacif...|         EAP|    EAP|1878255588|2004|
|Europe & Central ...|         ECA|    ECA| 396886165|2001|
|           IDA blend|         IDB|    IDB| 135810058|1964|
|           IDA blend|         IDB|    IDB| 403526930|2005|
|            IDA only|         IDX|    IDX| 984961696|2013|
+--------------------+------------+-------+----------+----+
only showing top 5 rows

Country Code seems to be redundant here, so while displaying this can be removed using the drop method

df1.join(df2, col('Country Code') == col('ctry_cd')).drop(col('ctry_cd')).show(5,False)
+---------------------------------------------+------------+----------+----+
|Country Name                                 |Country Code|Value     |Year|
+---------------------------------------------+------------+----------+----+
|East Asia & Pacific (excluding high income)  |EAP         |1878255588|2004|
|Europe & Central Asia (excluding high income)|ECA         |396886165 |2001|
|IDA blend                                    |IDB         |135810058 |1964|
|IDA blend                                    |IDB         |403526930 |2005|
|IDA only                                     |IDX         |984961696 |2013|
+---------------------------------------------+------------+----------+----+
only showing top 5 rows

   

In [None]:
c) Apply Spark SQL queries on the DataFrame to extract insights from the data.

Solution:-  Spark SQL seamlessly integrates with the Spark ecosystem, providing an interface to query structured data using SQL queries. It leverages the powerful distributed computing capabilities of Spark, allowing for efficient and scalable data processing. Spark SQL extends the concept of DataFrames, offering a high-level abstraction for working with structured data.

Loading and Exploring Data

To demonstrate the capabilities of Spark SQL, let’s consider a scenario where we have a dataset containing information about online product reviews. The dataset includes attributes such as reviewer ID, product ID, rating, and review text. We will load this dataset and perform various exploratory tasks using Spark SQL.

# Import SparkSession
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Load the dataset into a DataFrame
df = spark.read.csv("path/to/reviews.csv", header=True, inferSchema=True)
Once the dataset is loaded into a DataFrame, we can explore its structure and contents. Use the following methods to gain insights:

#To view the schema of the DataFrame:
df.printSchema()
#To display the first few rows of the DataFrame:
df.show()
#To get summary statistics of numerical columns:
df.describe().show()
#To count the number of rows in the DataFrame:
print("Number of rows:", df.count())
These operations will help us understand the structure and content of the dataset, allowing us to formulate meaningful queries and gain insights.

Querying Data using SQL

One of the major strengths of Spark SQL is the ability to execute SQL queries on DataFrames. Let’s explore some interesting queries we can perform on our dataset:

Task 1: Top Reviewers
We can identify the top reviewers based on the number of reviews they have submitted. This can be achieved with the following SQL query:

df.createOrReplaceTempView("reviews")
query = "SELECT reviewerID, COUNT(*) as reviewCount FROM reviews GROUP BY reviewerID ORDER BY reviewCount DESC LIMIT 10"
top_reviewers = spark.sql(query)
This query counts the number of reviews submitted by each reviewer and retrieves the top 10 reviewers based on the review count.

Task 2: Sentiment Analysis
We can perform sentiment analysis on the review text to understand the overall sentiment expressed in the reviews. To accomplish this, we can utilize Spark SQL’s built-in functions and user-defined functions (UDFs):

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a UDF to perform sentiment analysis
def analyze_sentiment(text):
 # Add your sentiment analysis logic here
 return "positive" if text.count("good") > text.count("bad") else "negative"

# Register the UDF
sentiment_udf = udf(analyze_sentiment, StringType())
spark.udf.register("analyze_sentiment", sentiment_udf)

# Apply sentiment analysis on the review text
df = df.withColumn("sentiment", analyze_sentiment(df["reviewText"]))
In this example, we define a UDF called `analyze_sentiment` to perform sentiment analysis on the review text. We register the UDF with Spark SQL and apply it to the DataFrame, adding a new column called “sentiment” that represents the sentiment of each review.

Task 3: Average Ratings by Product Category
We can calculate the average ratings for each product category to identify the highest-rated categories. This can be achieved with the following SQL query:

query = "SELECT category, AVG(overall) as averageRating FROM reviews GROUP BY category ORDER BY averageRating DESC"
average_ratings = spark.sql(query)
This query calculates the average rating for each product category and orders the results by the average rating in descending order.



In [None]:
Question 3. Spark Streaming:
a) Write a Python program to create a Spark Streaming application.

Solution:- Sample Python Spark Streaming application:

#!/usr/bin/env python
# coding: utf-8

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*
import time

#create spark session
spark = SparkSession.builder.getOrCreate()

# Connect to kafka server and read data stream
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "CHANGEME_KAFKA_SERVER") \
.option("kafka.sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='CHANGEME_USERNAME' password='CHANGEME_PASSWORD';") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.protocol", "TLSv1.2") \
.option("kafka.ssl.enabled.protocols", "TLSv1.2") \
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS") \
.option("subscribe", "CHANGEME_TOPIC") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

df.printSchema()

# Write the input data to memory
query = df.writeStream.outputMode("append").format("memory").queryName("testk2s").option("partition.assignment.strategy", "range").start()

query.awaitTermination(30)

query.stop()

query.status

# Query data
test_result=spark.sql("select * from testk2s")
test_result.show(5)

spark.sql("select count(*) from testk2s").show()
test_result_small = spark.sql("select * from testk2s limit 5")
test_result_small.show()

   

In [None]:
b) Configure the application to consume data from a streaming source (e.g., Kafka or a socket).
  
Solution:- Solution Approach
1. Create a SparkSession object.

2. Set up a Spark Streaming context with the appropriate configurations.

3. Define Kafka configuration properties, including the Kafka bootstrap servers, topic name, and any additional producer properties.


4. Create a DStream that represents the data stream to be processed (e.g., from a socket, Kafka source, or other streaming source).

5. Apply transformations and processing operations on the DStream to derive insights or perform calculations.


6. Use the Kafka producer API to write the processed data to a Kafka topic.

Code

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from kafka import KafkaProducer

# Create a SparkSession
spark = SparkSession.builder.appName("KafkaStreamingExample").getOrCreate()
# Set the batch interval for Spark Streaming (e.g., 1 second)
batch_interval = 1

# Create a Spark Streaming context
ssc = StreamingContext(spark.sparkContext, batch_interval)

# Define Kafka configuration properties
kafka_bootstrap_servers = "<kafka_bootstrap_servers>"
kafka_topic = "<kafka_topic>"
producer_properties = {
    "bootstrap.servers": kafka_bootstrap_servers
}

# Create a DStream that represents the data stream to be processed
data_stream = ssc.socketTextStream("<hostname>", <port>)

# Apply transformations and processing operations on the data stream
processed_stream = data_stream.flatMap(lambda line: line.split(" ")) \
                             .filter(lambda word: len(word) > 0)

# Write the processed data to a Kafka topic using the Kafka producer API
processed_stream.foreachRDD(lambda rdd: rdd.foreachPartition(write_to_kafka))

# Kafka producer function to write data to Kafka
def write_to_kafka(partition):
    producer = KafkaProducer(**producer_properties)
    for record in partition:
        producer.send(kafka_topic, value=record.encode("utf-8"))
    producer.close()

# Start the streaming context
ssc.start()

# Await termination or stop the streaming context manually
ssc.awaitTermination()
Explanation
– First, we import the necessary libraries, including SparkSession, StreamingContext, KafkaUtils, and KafkaProducer, to work with Spark Streaming and Kafka.

– We create a SparkSession object to provide a single entry point for Spark functionality.


– Next, we set the batch interval for the streaming context, which defines how often the streaming data is processed (e.g., 1 second).

– We create a StreamingContext object by passing the SparkContext and batch interval as parameters.

– We define the Kafka configuration properties, including the bootstrap servers.


– Using the appropriate streaming source (e.g., socketTextStream), we create a DStream that represents the data stream to be processed.

– We apply transformations on the DStream, such as splitting each line into words and filtering out empty words.

– Finally, we use the foreachRDD function to write the processed data to Kafka. The write_to_kafka function is defined to handle writing data to Kafka within each RDD partition.

 
Key Considerations:

– Ensure that the Spark Streaming and Kafka dependencies are correctly configured and available in your environment.

– Provide the appropriate Kafka bootstrap servers, topic name, and any additional producer properties.


– Consider scalability and resource allocation to handle increasing data volumes and processing requirements.

– Handle exceptions and ensure fault tolerance in case of failures or connectivity issues with Kafka.

In [None]:
c) Implement streaming transformations and actions to process and analyze the incoming data stream.

Solution:- Spark Transformation is a function that produces new RDD from the existing RDDs. It takes RDD as input and produces one or more RDD as output. Each time it creates new RDD when we apply any transformation. Thus, the so input RDDs, cannot be changed since RDD are immutable in nature.

Applying transformation built an RDD lineage, with the entire parent RDDs of the final RDD(s). RDD lineage, also known as RDD operator graph or RDD dependency graph. It is a logical execution plan i.e., it is Directed Acyclic Graph (DAG) of the entire parent RDDs of RDD.

Transformations are lazy in nature i.e., they get execute when we call an action. They are not executed immediately. Two most basic type of transformations is a map(), filter().
After the transformation, the resultant RDD is always different from its parent RDD. It can be smaller (e.g. filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same size (e.g. map).

There are two types of transformations:

Narrow transformation – In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map(), filter().

Wide transformation – In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. The partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey() and reducebyKey().

There are various functions in RDD transformation. Let us see RDD transformation with examples.

3.1. map(func)
The map function iterates over every line in RDD and split into new RDD. Using map() transformation we take in any function, and that function is applied to every element of RDD.

In the map, we have the flexibility that the input and the return type of RDD may differ from each other. For example, we can have input RDD type as String, after applying the

map() function the return RDD can be Boolean.

For example, in RDD {1, 2, 3, 4, 5} if we apply “rdd.map(x=>x+2)” we will get the result as (3, 4, 5, 6, 7).

Also Read: How to create RDD

Map() example:

[php]import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object  mapTest{
def main(args: Array[String]) = {
val spark = SparkSession.builder.appName(“mapExample”).master(“local”).getOrCreate()
val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.map(line => (line,line.length))
mapFile.foreach(println)
}
}[/php]

spark_test.txt”

hello...user! this file is created to check the operations of spark.
?, and how can we apply functions on that RDD partitions?. All this will be done through spark programming which is done with the help of scala language support…
Note – In above code, map() function map each line of the file with its length.
3.2. flatMap()
With the help of flatMap() function, to each input element, we have many elements in an output RDD. The most simple use of flatMap() is to split each input string into words.
Map and flatMap are similar in the way that they take a line from input RDD and apply a function on that line. The key difference between map() and flatMap() is map() returns only one element, while flatMap() can return a list of elements.

flatMap() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val flatmapFile = data.flatMap(lines => lines.split(” “))
flatmapFile.foreach(println)[/php]

Note – In above code, flatMap() function splits each line when space occurs.
3.3. filter(func)
Spark RDD filter() function returns a new RDD, containing only the elements that meet a predicate. It is a narrow operation because it does not shuffle data from one partition to many partitions.

For example, Suppose RDD contains first five natural numbers (1, 2, 3, 4, and 5) and the predicate is check for an even number. The resulting RDD after the filter will contain only the even numbers i.e., 2 and 4.

Filter() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.flatMap(lines => lines.split(” “)).filter(value => value==”spark”)
println(mapFile.count())[/php]

Note – In above code, flatMap function map line into words and then count the word “Spark” using count() Action after filtering lines containing “Spark” from mapFile.
Read: Apache Spark RDD vs DataFrame vs DataSet

3.4. mapPartitions(func)
The MapPartition converts each partition of the source RDD into many elements of the result (possibly none). In mapPartition(), the map() function is applied on each partitions simultaneously. MapPartition is like a map, but the difference is it runs separately on each partition(block) of the RDD.

3.5. mapPartitionWithIndex()
It is like mapPartition; Besides mapPartition it provides func with an integer value representing the index of the partition, and the map() is applied on partition index wise one after the other.

Learn: Spark Shell Commands to Interact with Spark-Scala

3.6. union(dataset)
With the union() function, we get the elements of both the RDD in new RDD. The key rule of this function is that the two RDDs should be of the same type.
For example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are (Big data, Spark, Flink) so the resultant rdd1.union(rdd2) will have elements (Spark, Spark, Spark, Hadoop, Flink, Flink, Big data).

Union() example:

[php]val rdd1 = spark.sparkContext.parallelize(Seq((1,”jan”,2016),(3,”nov”,2014),(16,”feb”,2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,”dec”,2014),(17,”sep”,2015)))
val rdd3 = spark.sparkContext.parallelize(Seq((6,”dec”,2011),(16,”may”,2015)))
val rddUnion = rdd1.union(rdd2).union(rdd3)
rddUnion.foreach(Println)[/php]

Note – In above code union() operation will return a new dataset that contains the union of the elements in the source dataset (rdd1) and the argument (rdd2 & rdd3).
3.7. intersection(other-dataset)
With the intersection() function, we get only the common element of both the RDD in new RDD. The key rule of this function is that the two RDDs should be of the same type.
Consider an example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are (Big data, Spark, Flink) so the resultant rdd1.intersection(rdd2) will have elements (spark).

Intersection() example:

[php]val rdd1 = spark.sparkContext.parallelize(Seq((1,”jan”,2016),(3,”nov”,2014, (16,”feb”,2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,”dec”,2014),(1,”jan”,2016)))
val comman = rdd1.intersection(rdd2)
comman.foreach(Println)[/php]

Note – The intersection() operation return a new RDD. It contains the intersection of elements in the rdd1 & rdd2.
Learn to Install Spark on Ubuntu

3.8. distinct()
It returns a new dataset that contains the distinct elements of the source dataset. It is helpful to remove duplicate data.
For example, if RDD has elements (Spark, Spark, Hadoop, Flink), then rdd.distinct() will give elements (Spark, Hadoop, Flink).

Distinct() example:

[php]val rdd1 = park.sparkContext.parallelize(Seq((1,”jan”,2016),(3,”nov”,2014),(16,”feb”,2014),(3,”nov”,2014)))
val result = rdd1.distinct()
println(result.collect().mkString(“, “))[/php]

Note – In the above example, the distinct function will remove the duplicate record i.e. (3,'”nov”,2014).
3.9. groupByKey()
When we use groupByKey() on a dataset of (K, V) pairs, the data is shuffled according to the key value K in another RDD. In this transformation, lots of unnecessary data get to transfer over the network.

Spark provides the provision to save data to disk when there is more data shuffled onto a single executor machine than can fit in memory. Follow this link to learn about RDD Caching and Persistence mechanism in detail.

groupByKey() example:

[php]val data = spark.sparkContext.parallelize(Array((‘k’,5),(‘s’,3),(‘s’,4),(‘p’,7),(‘p’,5),(‘t’,8),(‘k’,6)),3)
val group = data.groupByKey().collect()
group.foreach(println)[/php]

Note – The groupByKey() will group the integers on the basis of same key(alphabet). After that collect() action will return all the elements of the dataset as an Array.
3.10. reduceByKey(func, [numTasks])
When we use reduceByKey on a dataset (K, V), the pairs on the same machine with the same key are combined, before the data is shuffled.

reduceByKey() example:

[php]val words = Array(“one”,”two”,”two”,”four”,”five”,”six”,”six”,”eight”,”nine”,”ten”)
val data = spark.sparkContext.parallelize(words).map(w => (w,1)).reduceByKey(_+_)
data.foreach(println)[/php]

Note – The above code will parallelize the Array of String. It will then map each word with count 1, then reduceByKey will merge the count of values having the similar key.
Read: Various Features of RDD

3.11. sortByKey()
When we apply the sortByKey() function on a dataset of (K, V) pairs, the data is sorted according to the key K in another RDD.

sortByKey() example:

[php] val data = spark.sparkContext.parallelize(Seq((“maths”,52), (“english”,75), (“science”,82), (“computer”,65), (“maths”,85)))
val sorted = data.sortByKey()
sorted.foreach(println)[/php]

Note – In above code, sortByKey() transformation sort the data RDD into Ascending order of the Key(String).
Read: Limitations of RDD

3.12. join()
The Join is database term. It combines the fields from two table using common values. join() operation in Spark is defined on pair-wise RDD. Pair-wise RDDs are RDD in which each element is in the form of tuples. Where the first element is key and the second element is the value.

The boon of using keyed data is that we can combine the data together. The join() operation combines two data sets on the basis of the key.

Join() example:

[php]val data = spark.sparkContext.parallelize(Array((‘A’,1),(‘b’,2),(‘c’,3)))
val data2 =spark.sparkContext.parallelize(Array((‘A’,4),(‘A’,6),(‘b’,7),(‘c’,3),(‘c’,8)))
val result = data.join(data2)
println(result.collect().mkString(“,”))[/php]

Note –  The join() transformation will join two different RDDs on the basis of Key.
Read: RDD lineage in Spark: ToDebugString Method

3.13. coalesce()
To avoid full shuffling of data we use coalesce() function. In coalesce() we use existing partition so that less data is shuffled. Using this we can cut the number of the partition. Suppose, we have four nodes and we want only two nodes. Then the data of extra nodes will be kept onto nodes which we kept.

Coalesce() example:

[php]val rdd1 = spark.sparkContext.parallelize(Array(“jan”,”feb”,”mar”,”april”,”may”,”jun”),3)
val result = rdd1.coalesce(2)
result.foreach(println)[/php]

Note – The coalesce will decrease the number of partitions of the source RDD to numPartitions define in coalesce argument.
4. 
An action is one of the ways of sending data from Executer to the driver. 
Executors are agents that are responsible for executing a task. 
While the driver is a JVM process that coordinates workers and execution of the task. 
Some of the actions of Spark are:

4.1. count()
Action count() returns the number of elements in RDD.

For example, RDD has values {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “rdd.count()” will give the result 8.

Count() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.flatMap(lines => lines.split(” “)).filter(value => value==”spark”)
println(mapFile.count())[/php]

Note – In above code flatMap() function maps line into words and count the word “Spark” using count() Action after filtering lines containing “Spark” from mapFile.
Learn: Spark Streaming

4.2. collect()
The action collect() is the common and simplest operation that returns our entire RDDs content to driver program. The application of collect() is unit testing where the entire RDD is expected to fit in memory. As a result, it makes easy to compare the result of RDD with the expected result.
Action Collect() had a constraint that all the data should fit in the machine, and copies to the driver.

Collect() example:

[php]val data = spark.sparkContext.parallelize(Array((‘A’,1),(‘b’,2),(‘c’,3)))
val data2 =spark.sparkContext.parallelize(Array((‘A’,4),(‘A’,6),(‘b’,7),(‘c’,3),(‘c’,8)))
val result = data.join(data2)
println(result.collect().mkString(“,”))[/php]

Note – join() transformation in above code will join two RDDs on the basis of same key(alphabet). After that collect() action will return all the elements to the dataset as an Array.
4.3. take(n)
The action take(n) returns n number of elements from RDD. It tries to cut the number of partition it accesses, so it represents a biased collection. We cannot presume the order of the elements.

For example, consider RDD {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “take (4)” will give result { 2, 2, 3, 4}

Take() example:

[php]val data = spark.sparkContext.parallelize(Array((‘k’,5),(‘s’,3),(‘s’,4),(‘p’,7),(‘p’,5),(‘t’,8),(‘k’,6)),3)

val group = data.groupByKey().collect()

val twoRec = result.take(2)

twoRec.foreach(println)[/php]

Note – The take(2) Action will return an array with the first n elements of the data set defined in the taking argument.
Learn: Apache Spark DStream (Discretized Streams)

4.4. top()
If ordering is present in our RDD, then we can extract top elements from our RDD using top(). Action top() use default ordering of data.

Top() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val mapFile = data.map(line => (line,line.length))
val res = mapFile.top(3)
res.foreach(println)[/php]

Note – map() operation will map each line with its length. And top(3) will return 3 records from mapFile with default ordering.
4.5. countByValue()
The countByValue() returns, many times each element occur in RDD.

For example, RDD has values {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “rdd.countByValue()”  will give the result {(1,1), (2,2), (3,1), (4,1), (5,2), (6,1)}

countByValue() example:

[php]val data = spark.read.textFile(“spark_test.txt”).rdd
val result= data.map(line => (line,line.length)).countByValue()
result.foreach(println)[/php]

Note – The countByValue() action will return a hashmap of (K, Int) pairs with the count of each key.
Learn: Apache Spark Streaming Transformation Operations

4.6. reduce()
The reduce() function takes the two elements as input from the RDD and then produces the output of the same type as that of the input elements. The simple forms of such function are an addition. We can add the elements of RDD, count the number of words. It accepts commutative and associative operations as an argument.

Reduce() example:

[php]val rdd1 = spark.sparkContext.parallelize(List(20,32,45,62,8,5))
val sum = rdd1.reduce(_+_)
println(sum)[/php]

Note – The reduce() action in above code will add the elements of the source RDD.
4.7. fold()
The signature of the fold() is like reduce(). Besides, it takes “zero value” as input, which is used for the initial call on each partition. But, the condition with zero value is that it should be the identity element of that operation. The key difference between fold() and reduce() is that, reduce() throws an exception for empty collection, but fold() is defined for empty collection.

For example, zero is an identity for addition; one is identity element for multiplication. The return type of fold() is same as that of the element of RDD we are operating on.
For example, rdd.fold(0)((x, y) => x + y).

Fold() example:

[php]val rdd1 = spark.sparkContext.parallelize(List((“maths”, 80),(“science”, 90)))
val additionalMarks = (“extra”, 4)
val sum = rdd1.fold(additionalMarks){ (acc, marks) => val add = acc._2 + marks._2
(“total”, add)
}
println(sum)[/php]

Note – In above code additionalMarks is an initial value. This value will be added to the int value of each record in the source RDD.
Learn: Spark Streaming Checkpoint in Apache Spark

4.8. aggregate()
It gives us the flexibility to get data type different from the input type. The aggregate() takes two functions to get the final result. Through one function we combine the element from our RDD with the accumulator, and the second, to combine the accumulator. Hence, in aggregate, we supply the initial zero value of the type which we want to return.

4.9. foreach()
When we have a situation where we want to apply operation on each element of RDD, but it should not return value to the driver. In this case, foreach() function is useful. For example, inserting a record into the database.

Foreach() example:

[php]val data = spark.sparkContext.parallelize(Array((‘k’,5),(‘s’,3),(‘s’,4),(‘p’,7),(‘p’,5),(‘t’,8),(‘k’,6)),3)
val group = data.groupByKey().collect()
group.foreach(println)[/php]

Note – The foreach() action run a function (println) on each element of the dataset group.

In [None]:
Question 4. Spark SQL and Data Source Integration:

a) Write a Python program to connect Spark with a relational database (e.g., MySQL, PostgreSQL).

Solution:- Connecting PySpark application with a locally installed MySQL database
Before writing your scripts you need to put the MySQL jar file inside jars directory
/home/<username>/.local/lib/python3.8/site-packages/pyspark/jars/
mysql-connector-java-8.0.22.jar

Download the jar file from the below link

Download mysql-connector-java JAR file with all dependencies
JDBC Type 4 driver for MySQL Artifact mysql-connector-java Group org.wisdom-framework Version 5.1.34_1 Last update 13…
jar-download.com

And configured spark session

spark = SparkSession \
   .builder.config("spark.jars", "mysql-connector-java-8.0.22.jar")\
   .master("local") \
   .appName("PySpark_MySQL_test") \
   .getOrCreate()

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder.config("spark.jars", "mysql-connector-java-8.0.22.jar") \
    .master("local").\
    appName("PySpark_MySQL_test")\
    .getOrCreate()


jdbcDF = spark.read.format("jdbc").\
    option(
            url="jdbc:mysql://localhost/<database_name>",
            driver="com.mysql.jdbc.Driver",
            dbtable="<table_name>",
            user="<user_name>"
            password="<password>").\
load()
jdbcDF.show()



Connecting PySpark with a locally installed Postgres RDB
Here you need to put the Postgres jar file inside the jars directory
/home/<username>/.local/lib/python3.8/site-packages/pyspark/jars/
postgresql-42.2.14.jar

Download the jar file from the below link

And configured spark session

spark = SparkSession \
      .builder \
      .appName("Python Spark SQL basic example") \ 
      .config("spark.jars", "postgresql-42.2.14.jar") \
      .getOrCreate()
Download
Binary JAR file downloads of the JDBC driver are available here and the current version with Maven Repository. Because…
jdbc.postgresql.org


rom pyspark.sql import SparkSession

# the Spark session should be instantiated as follows
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "postgresql-42.2.14.jar") \
    .getOrCreate()
    
jdbcDF = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://localhost:5432/<database_name>', 
         dbtable='<table_name>',
         user='<user_name>',
         password='<user_password>',
         driver='org.postgresql.Driver').\
load()

Connecting PySpark with Postgres RDS

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "postgresql-42.2.14.jar") \
    .getOrCreate()

jdbcDF = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://nbj-instance.crjyrm6gyucn.-1.rds.amazonaws.com/<database_name>', 
         dbtable='<table_name>',
         user='<user_name>',
         password='<password>',
         driver='org.postgresql.Driver').\
load()
jdbcDF.show()




In [None]:
b)Perform SQL operations on the data stored in the database using Spark SQL.

Solution:- SQL module.

spark.sql.SparkSession – SparkSession is the main entry point for DataFrame and SQL functionality.
spark.sql.DataFrame – DataFrame is a distributed collection of data organized into named columns.
spark.sql.Column – A column expression in a DataFrame.
spark.sql.Row – A row of data in a DataFrame.
spark.sql.GroupedData – An object type that is returned by DataFrame.groupBy().
spark.sql.DataFrameNaFunctions – Methods for handling missing data (null values).
spark.sql.DataFrameStatFunctions – Methods for statistics functionality.
spark.sql.functions – List of standard built-in functions.
spark.sql.types – Available SQL data types in Spark.
spark.sql.Window – Would be used to work with window functions.
Regardless of what approach you use, you have to create a SparkSession which is an entry point to the Spark application.

3. Running SQL Queries in Spark
Spark SQL is one of the most used Spark modules which is used for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax.

In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe, in the SQL tutorial, you will learn in detail using SQL select, where, group by, join, union e.t.c

In order to use SQL, first, create a temporary table on DataFrame using the createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using sql() and it will be dropped along with your SparkContext termination.

Use sql() method of the SparkSession object to run the query and this method returns a new DataFrame.

4. Spark SQL Examples
4.1 Create SQL View
Create a DataFrame from a CSV file. You can find this CSV file at Github project.


// Read CSV file into table
val df = spark.read.option("header",true) 
          .csv("/Users/admin/simple-zipcodes.csv")
df.printSchema()
df.show()

To use ANSI SQL query similar to RDBMS, you need to create a temporary table by reading the data from a CSV file. 

// Read CSV file into table
spark.read.option("header",true) 
          .csv("/Users/admin/simple-zipcodes.csv") 
          .createOrReplaceTempView("Zipcodes")
        
        
        Spark SQL to Select Columns
The select() function of DataFrame API is used to select the specific columns from the DataFrame.


// DataFrame API Select query
df.select("country","city","zipcode","state") 
     .show(5)
In SQL, you can achieve the same using SELECT FROM clause as shown below.


// SQL Select query
spark.sql("SELECT country, city, zipcode, state FROM ZIPCODES") 
     .show(5)
Both above examples yields the below output.

spark sql

Filter Rows
To filter the rows from the data, you can use where() function from the DataFrame API.


// DataFrame API where()
df.select("country","city","zipcode","state") 
  .where("state == 'AZ'") 
  .show(5)
Similarly, in SQL you can use WHERE clause as follows.


// SQL where
spark.sql(""" SELECT  country, city, zipcode, state FROM ZIPCODES 
          WHERE state = 'AZ' """) 
     .show(5)

spark sql example

Sorting
To sort rows on a specific column use orderBy() function on DataFrame API.



// sorting
df.select("country","city","zipcode","state") 
  .where("state in ('PR','AZ','FL')") 
  .orderBy("state") 
  .show(10)
In SQL, you can achieve sorting by using ORDER BY clause.


// SQL ORDER BY
spark.sql(""" SELECT  country, city, zipcode, state FROM ZIPCODES 
          WHERE state in ('PR','AZ','FL') order by state """) 
     .show(10)

Grouping
The groupBy().count() is used to perform the group by on DataFrame.


// grouping
df.groupBy("state").count() 
  .show()
You can achieve group by in Spark SQL is by using GROUP BY clause.



// SQL GROUP BY clause
spark.sql(""" SELECT state, count(*) as count FROM ZIPCODES 
          GROUP BY state""") 
     .show()

SQL Join Operations
Similarly, if you have two tables, you can perform the Join operations in Spark.
Spark DataFrame supports all basic SQL Join Types like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN.
Spark SQL Joins are wider transformations that result in data shuffling over the network hence they have huge performance issues when not designed with care.

On the other hand Spark SQL Joins comes with more optimization by default (thanks to DataFrames & Dataset) 
however still there would be some performance issues to consider while using.
2. Inner Join
Spark Inner join is the default join and it’s mostly used, It is used to join two DataFrames/Datasets on key columns, and where keys don’t match the rows get dropped from both datasets (emp & dept).


  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)
When we apply Inner join on our datasets, It drops “emp_dept_id” 50 from “emp” and “dept_id” 30 from “dept” datasets. Below is the result of the above Join expression.


+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
3. Full Outer Join
Outer a.k.a full, fullouter join returns all rows from both Spark DataFrame/Datasets, where join expression doesn’t match it returns null on respective record columns.


  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"outer")
    .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"full")
    .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"fullouter")
    .show(false)
From our “emp” dataset’s “emp_dept_id” with value 50 doesn’t have a record on “dept” hence dept columns have null and “dept_id” 30 doesn’t have a record in “emp” hence you see null’s on emp columns. Below is the result of the above Join expression.


+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
4. Left Outer Join
Spark Left a.k.a Left Outer join returns all rows from the left DataFrame/Dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found.


  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"left")
    .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftouter")
    .show(false)
From our dataset, “emp_dept_id” 5o doesn’t have a record on “dept” dataset hence, this record contains null on “dept” columns (dept_name & dept_id). and “dept_id” 30 from “dept” dataset dropped from the results. Below is the result of the above Join expression.


+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
5. Right Outer Join
Spark Right a.k.a Right Outer join is opposite of left join, here it returns all rows from the right DataFrame/Dataset regardless of math found on the left dataset, when join expression doesn’t match, it assigns null for that record and drops records from left where match not found.


  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"right")
   .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"rightouter")
   .show(false)
From our example, the right dataset “dept_id” 30 doesn’t have it on the left dataset “emp” hence, this record contains null on “emp” columns. and “emp_dept_id” 50 dropped as a match not found on left. Below is the result of the above Join expression.


+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
6. Left Semi Join
Spark Left Semi join is similar to inner join difference being leftsemi join returns all columns from the left DataFrame/Dataset and ignores all columns from the right dataset. In other words, this join returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on join expression are ignored from both left and right datasets.

The same result can be achieved using select on the result of the inner join however, using this join would be efficient.


  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftsemi")
    .show(false)
Below is the result of the above join expression.


leftsemi join
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+
7. Left Anti Join
Left Anti join does the exact opposite of the Spark leftsemi join, leftanti join returns only columns from the left DataFrame/Dataset for non-matched records.


  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftanti")
    .show(false)
Yields below output


+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+
8. Self Join
Spark Joins are not complete without a self join, Though there is no self-join type available, we can use any of the above-explained join types to join DataFrame to itself. below example use inner self join


  empDF.as("emp1").join(empDF.as("emp2"),
    col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
    .select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
      .show(false)
Here, we are joining emp dataset with itself to find out superior emp_id and name for all employees.


+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+
9. Using SQL Expression
Since Spark SQL support native SQL syntax, we can also write join operations after creating temporary tables on DataFrame’s and using spark.sql()


  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")
//SQL JOIN
  val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
  joinDF.show(false)

  val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
  joinDF2.show(false)


 Union
    
Dataframe union() – union() method of the DataFrame is used to combine two DataFrame’s of the same structure/schema. If schemas are not the same it returns an error.

DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.0” version and replaced with union().

Note: In other SQL’s, Union eliminates the duplicates but UnionAll combines two datasets including duplicate records. 
But, in spark both behave the same and use DataFrame duplicate function to remove duplicate rows.

DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.0” version and replaced with union().

Note: In other SQL’s, Union eliminates the duplicates but UnionAll combines two datasets including duplicate records. But, in spark both behave the same and use DataFrame duplicate function to remove duplicate rows.
    

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: integer (nullable = false)
 |-- age: integer (nullable = false)
 |-- bonus: integer (nullable = false)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
+-------------+----------+-----+------+---+-----+

Now, let’s create a second Dataframe with the new records and some records from the above Dataframe but with the same schema.


  val simpleData2 = Seq(("James","Sales","NY",90000,34,10000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  )
  val df2 = simpleData2.toDF("employee_name","department","state","salary","age","bonus")
This yields below output


+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

Combine two or more DataFrames using union
DataFrame union() method combines two DataFrames and returns the new DataFrame with all rows from two Dataframes regardless of duplicate data.


  val df3 = df.union(df2)
  df3.show(false)
As you see below it returns all records.


+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
Combine DataFrames using unionAll
DataFrame unionAll() method is deprecated since Spark “2.0.0” version and recommends using the union() method.



  val df4 = df.unionAll(df2)
  df4.show(false)
Returns the same output as above.

Combine without Duplicates
Since the union() method returns all rows without distinct records, we will use the distinct() function to return just one record when duplicate exists.


  val df5 = df.union(df2).distinct()
  df5.show(false)
Yields below output. As you see, this returns only distinct rows.


+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
+-------------+----------+-----+------+---+-----+
Complete Example of DataFrame Union

package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.SparkSession

object UnionExample extends App{

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._

  val simpleData = Seq(("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000)
  )
  val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
  df.printSchema()
  df.show()

  val simpleData2 = Seq(("James","Sales","NY",90000,34,10000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  )
  val df2 = simpleData2.toDF("employee_name","department","state","salary","age","bonus")
  df2.show(false)

  val df3 = df.union(df2)
  df3.show(false)
  df3.distinct().show(false)

  val df4 = df.unionAll(df2)
  df4.show(false)
}

    

In [None]:
c) Explore the integration capabilities of Spark with other data sources, such as Hadoop Distributed File System (HDFS) or Amazon S3.

Solution:- First, Spark is intended to enhance, not replace, the Hadoop stack. From day one, Spark was designed to read and write data from and to HDFS, as well as other storage systems, such as HBase and Amazon’s S3. As such, Hadoop users can enrich their processing capabilities by combining Spark with Hadoop MapReduce, HBase, and other big data frameworks.

Second, we have constantly focused on making it as easy as possible for every Hadoop user to take advantage of Spark’s capabilities. No matter whether you run Hadoop 1.x or Hadoop 2.0 (YARN), and no matter whether you have administrative privileges to configure the Hadoop cluster or not, there is a way for you to run Spark! In particular, there are three ways to deploy Spark in a Hadoop cluster: standalone, YARN, and SIMR.

Standalone deployment: With the standalone deployment one can statically allocate resources on all or a subset of machines in a Hadoop cluster and run Spark side by side with Hadoop MR. The user can then run arbitrary Spark jobs on her HDFS data. Its simplicity makes this the deployment of choice for many Hadoop 1.x users.

Hadoop Yarn deployment: Hadoop users who have already deployed or are planning to deploy Hadoop Yarn can simply run Spark on YARN without any pre-installation or administrative access required. This allows users to easily integrate Spark in their Hadoop stack and take advantage of the full power of Spark, as well as of other components running on top of Spark.

Spark In MapReduce (SIMR): For the Hadoop users that are not running YARN yet, another option, in addition to the standalone deployment, is to use SIMR to launch Spark jobs inside MapReduce. With SIMR, users can start experimenting with Spark and use its shell within a couple of minutes after downloading it! This tremendously lowers the barrier of deployment, and lets virtually everyone play with Spark.
    
    
 Apache Spark AWS S3 Datasource

example will be to read a simple CSV file from our local disk and write it to S3. 
Then we will read, what we have just written, from S3 again and print it in the notebook/console.

Assumptions
Your AWS S3 bucket has been created, you have accessKeyId and secretAccessKey.

Coding
Open up a Jupyter notebook and put the following to the first paragraph to find findspark.

import findspark
# /opt/manual/spark: this is SPARK_HOME path
findspark.init(“/opt/manual/spark”)
Download standard PySpark libs

from pyspark.sql import SparkSession, functions as F
from pyspark import SparkConf, SparkContext
Libraries (Jars) for S3 Connection
Now here we will download the additional libraries (jar files) to be used in the S3 connection and put them among the other jar files of Spark. Of course, there are other ways to do this. But here I prefer this way.

When downloading files from the Maven repo, you should definitely pay attention to the spark-hadoop version (You decide this while downloading and installing spark and you choose among many compilations, my example is Spark 3.0.0. Hadoop 3.2). To prevent any mistake please pay attention to the following figures and explanations.



Downloading and moving jar files:
There are other jar files in spark installation. We will put these two into the `SPARK_HOME/jars` directory.

wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar
mv aws-java-sdk-bundle-1.11.375.jar hadoop-aws-3.2.0.jar /opt/manual/spark/jars/
Spark Session
spark = SparkSession.builder.getOrCreate()
spark.verision
Output
3.0.0
Configuration
Let’s first assign access keys to variables. You can also introduce it to environment variables if you want. However, neither send these keys to repos such as github, nor share them with anyone else, if in doubt, revoke the keys and generate new ones.

accessKeyId=’your_access_key’
secretAccessKey=’your_secret_key’
Now let’s define the configurations with a function and add them to the SparkContext:

sc._jsc.hadoopConfiguration().set(‘fs.s3a.access.key’, accessKeyId)
sc._jsc.hadoopConfiguration().set(‘fs.s3a.secret.key’, secretAccessKey)
sc._jsc.hadoopConfiguration().set(‘fs.s3a.path.style.access’, ‘true’)
sc._jsc.hadoopConfiguration().set(‘fs.s3a.impl’, ‘org.apache.hadoop.fs.s3a.S3AFileSystem’)
 
sc._jsc.hadoopConfiguration().set(‘fs.s3a.endpoint’, ‘s3.amazonaws.com’)
Let’s read the dataset. You can access it here.

df = spark.read \
.option(“inferSchema”,True) \
.option(“header”, True) \
.csv(“file:///home/train/datasets/simple_data.csv”)
Let’s take a look at the dataset:

df.show(3)
+ — — — + — — -+ — -+ — — — — + — — — — + — — — — — -+
|sirano | isim |yas | meslek  | sehir   |aylik_gelir |
+ — — — + — — -+ — -+ — — — — + — — — — + — — — — — -+
| 1     |Cemal | 35 | Isci    | Ankara  | 3500       |
| 2     |Ceyda | 42 | Memur   | Kayseri | 4200       |
| 3     |Timur | 30 |Müzisyen |Istanbul | 9000       |
+ — — — + — — -+ — -+ — — — — + — — — — + — — — — — -+
Writing Spark Dataframe to AWS S3

After downloading the libraries with the necessary and appropriate versions above and configuring Spark, the work is no different than writing to the local disk at this point.

df.write.format(‘csv’).option(‘header’,’true’) \
.save(‘s3a://<your_bucket_name_here>/<your_folder_here>’, mode=’overwrite’)

Reading Data from AWS S3 with Spark
Now let’s read this data again with Spark.

df_s3 = spark.read.format(‘csv’).option(‘header’,’true’) \
.load(‘s3a://<your_bucket_name_here>/<your_folder_here>’)


    