<a href="https://colab.research.google.com/github/Akshatpattiwar512/Big-data-ml/blob/main/Apache_Spark_Part_3_Advance_Dataframe_Operations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Apache Spark-Part 3: Advance Dataframe Operations

Following notebook is based on [Apache Spark-Part 3: Advance Dataframe Operations](https://www.linkedin.com/pulse/apache-spark-part-3-advance-dataframe-operations-akshat-pattiwar/) article

This notebook has implementation of Advanced dataframe operations

##Installing pySpark

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 73kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=4141b67b8811fea8354684e44d42dff47f38e6780bbc1eb053996395fdb14001
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


##Import Required Libraries

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

##Create SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

##Create DataFrame

In [4]:
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"]) 

# Create Temp Tables
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")

# Save as HIVE tables.
df.write.saveAsTable("hive_empdf", mode = "overwrite")
deptdf.write.saveAsTable("hive_deptdf", mode = "overwrite")

##BroadCast Join

The size of the broadcast table is 10 MB. However, we can change the threshold upto 8GB as per the official documentation of Spark 2.3.

* We can check the size of the broadcast table as follow :

In [7]:
size = int(spark.conf.get("spark.sql.autoBroadcastJoinThreshold")) / (2048)
print("Default size of broadcast table is {0} MB.".format(size))

Default size of broadcast table is 25600.0 MB.


In [6]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)

##Caching

We can use cache/persist function to keep the dataframe in-memory. It may improve the performance of your spark application significantly if we cache the data which we need to use very frequently in our application.

In [8]:
df.cache()
df.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


When we use cache function it will use storage level as Memory_Only until Spark 2.0.2. Since Spark 2.1.x it is Memory_and_DISK.

However, if we need to specify the various available storage levels we can use persist method. For example, if we need to keep the data in Memory only we can use below snippet.

In [9]:
from pyspark.storagelevel import StorageLevel

In [10]:
deptdf.persist(StorageLevel.MEMORY_ONLY)
deptdf.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


##Un-Persist

It is also important to un-cache the data when it will no longer be required.

In [11]:
df.unpersist()

DataFrame[id: bigint, name: string, dept: string, salary: bigint]

##SQL Expressions

We can use SQL expression for the manipulation of data as well. We have "expr" function and also a variant of a select method as "selectExpr" for evaluation of SQL expressions.

In [12]:
from pyspark.sql.functions import expr

# Let's try to categorige the salary under Low, Mid and High as per below categorization.

# 0-2000 : low_salary
# 2001 - 5000 : mid_salary
# > 5001 : high_salary

cond = """case when salary > 5000 then 'high_salary'
               else case when salary > 2000 then 'mid_salary'
                    else case when salary > 0 then 'low_salary'
                         else 'invalid_salary'
                              end
                         end
                end as salary_level"""
newdf = df.withColumn("salary_level", expr(cond))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



##Using selectExpr function

In [13]:
newdf = df.selectExpr("*", cond)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



##User Defined Functions (UDF)

Often we need to write the function based on the our very specific requirement. Here we can leverage the udfs. We can write our own functions in language like python and register the function as udf, then we can use the function for DataFrame operations.

* Python function to find the salary_level for a given salary.

In [14]:
def detSalary_Level(sal):
    level = None

    if(sal > 5000):
        level = 'high_salary'
    elif(sal > 2000):
        level = 'mid_salary'
    elif(sal > 0):
        level = 'low_salary'
    else:
        level = 'invalid_salary'
    return level

* Then register the function "detSalary_Level" as UDF.

In [15]:
sal_level = udf(detSalary_Level, StringType())

* Apply function to determine the salary_level for given salary.

In [16]:
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



##Working with NULL Values

NULL values are always tricky to deal with irrespective of the Framework or language we use. Here in spark we have few specific function to deal with NULL values.

###isNull()

This function will help us to find the null values for any given columns. For e.g. if we need to find the columns where id columns contains the null values.

In [17]:
newdf = df.filter(df["dept"].isNull())
newdf.show()

+----+----+----+------+
|  id|name|dept|salary|
+----+----+----+------+
|null|null|null|  7500|
|   9| III|null|  4500|
+----+----+----+------+



###isNotNull()

This function works opposite to isNull() function and will return all the not null values for a particular function.

In [18]:
newdf = df.filter(df["dept"].isNotNull())
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



###fillna()

This function will help us to replace Null values.

In [19]:
# Replace -1 where the salary is null.
newdf = df.fillna("INVALID", ["dept"])
newdf.show()

+----+----+-------+------+
|  id|name|   dept|salary|
+----+----+-------+------+
|   1| AAA|  dept1|  1000|
|   2| BBB|  dept1|  1100|
|   3| CCC|  dept1|  3000|
|   4| DDD|  dept1|  1500|
|   5| EEE|  dept2|  8000|
|   6| FFF|  dept2|  7200|
|   7| GGG|  dept3|  7100|
|null|null|INVALID|  7500|
|   9| III|INVALID|  4500|
|  10|null|  dept5|  2500|
+----+----+-------+------+



###dropna()

This function will help us to remove the rows with null values.

In [20]:
# Remove all rows which contains any null values.
newdf = df.dropna()
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
+---+----+-----+------+



In [21]:
# Remove all rows which contains all null values.
newdf = df.dropna(how = "all")
newdf.show()

+----+----+-----+------+
|  id|name| dept|salary|
+----+----+-----+------+
|   1| AAA|dept1|  1000|
|   2| BBB|dept1|  1100|
|   3| CCC|dept1|  3000|
|   4| DDD|dept1|  1500|
|   5| EEE|dept2|  8000|
|   6| FFF|dept2|  7200|
|   7| GGG|dept3|  7100|
|null|null| null|  7500|
|   9| III| null|  4500|
|  10|null|dept5|  2500|
+----+----+-----+------+



In [22]:
# Remove all rows where columns : dept is null.
newdf = df.dropna(subset = "dept")
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



##Partitioning

Partitioning is very important aspect to control the parallelism for spark Application.

* Check number of partitions.

In [23]:
df.rdd.getNumPartitions()

2

* Increase number of partitions. For e.g. Increase partitions to 6

In [24]:
newdf = df.repartition(6)
newdf.rdd.getNumPartitions()

6

**Note : This is expensive operations since it require shuffling of data across the workers.**

* Decrease Number of Partitions. For e.g. decrease partitions to 2.

In [25]:
newdf = df.coalesce(2)
newdf.rdd.getNumPartitions()

2

* By default the number of partitions for Spark SQL is 200.

* But we can eet the Number of partitions at Spark Application level as well. For e.g. set to 500

In [26]:
# Set number of partitions as Spark Application.
spark.conf.set("spark.sql.shuffle.partitions", "500")

# Check the number of patitions.
num_part = spark.conf.get("spark.sql.shuffle.partitions")
print("No of Partitions : {0}".format(num_part))

No of Partitions : 500


##Catalog API

Spark Catalog is a user-facing API, which you can access using SparkSession.catalog.

* listDatabases()

It will return all the databases along with their Location on file system.

In [27]:
spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/content/spark-warehouse')]

* listTables()

It will return all the tables for a given database along with the information like Table type (External/Managed) and whether a particular table is temporary or permanent. This includes all temporary views.

In [28]:
spark.catalog.listTables("default")

[Table(name='hive_deptdf', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='hive_empdf', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='deptdf', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='empdf', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

* listColumns()

It will return all the columns for a particular table in DataBase. Also, it will return datatype, if the column is used in Partitioning or Bucketing.

In [29]:
spark.catalog.listColumns("hive_empdf", "default")

[Column(name='id', description=None, dataType='bigint', nullable=True, isPartition=False, isBucket=False),
 Column(name='name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='dept', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='salary', description=None, dataType='bigint', nullable=True, isPartition=False, isBucket=False)]

* listFunctions()

It will return all the available function in spark Session along with the information whether it is temporary or not.

In [30]:
spark.catalog.listFunctions()

[Function(name='!', description=None, className='org.apache.spark.sql.catalyst.expressions.Not', isTemporary=True),
 Function(name='%', description=None, className='org.apache.spark.sql.catalyst.expressions.Remainder', isTemporary=True),
 Function(name='&', description=None, className='org.apache.spark.sql.catalyst.expressions.BitwiseAnd', isTemporary=True),
 Function(name='*', description=None, className='org.apache.spark.sql.catalyst.expressions.Multiply', isTemporary=True),
 Function(name='+', description=None, className='org.apache.spark.sql.catalyst.expressions.Add', isTemporary=True),
 Function(name='-', description=None, className='org.apache.spark.sql.catalyst.expressions.Subtract', isTemporary=True),
 Function(name='/', description=None, className='org.apache.spark.sql.catalyst.expressions.Divide', isTemporary=True),
 Function(name='<', description=None, className='org.apache.spark.sql.catalyst.expressions.LessThan', isTemporary=True),
 Function(name='<=', description=None, cl

* currentDatabase()

Get the current DataBase.

In [31]:
spark.catalog.currentDatabase()

'default'

* setCurrentDatabase()

Set the Current Database

In [32]:
# Un-comment below code and specify the DB_Name to be set as current Database.
#spark.catalog.setCurrentDatabase(<DB_Name>)

* cacheTable()

cache a particular table.

In [33]:
spark.catalog.cacheTable("default.hive_empdf")

* isCached()

Check if Table is cached or not.

In [34]:
spark.catalog.isCached("default.hive_empdf")

True

* uncacheTable()

Un-cache particular table.

In [35]:
spark.catalog.uncacheTable("default.hive_empdf")

In [36]:
# Verify uncached table. Now you will see that it will return "False" which means table is not cached.
spark.catalog.isCached("default.hive_empdf")

False

* clearCache()

Un-cache all table in Spark session.

In [37]:
spark.catalog.clearCache()

-------------------------------------------------------------------
# End of part 3

-------------------------------------------------------------------