# TUTORIAL 3

## Handling Missing Values in Spark Dataframe

 - Dropping Columns
 - Dropping Rows
 - Various Parameters in Dropping Functionality
 - Handling Missing Values by Mean, Median and Mode
 - How to read a dataset from URL

In [41]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import col

In [3]:
spark=SparkSession.builder.appName('Training').getOrCreate()

In [4]:
spark

In [17]:
spk_df=spark.read.csv('bhp.csv',header=True, inferSchema=True)

In [18]:
spk_df.show()

+--------------------+---------+----------+----+-----+---+--------------+
|            location|     size|total_sqft|bath|price|bhk|price_per_sqft|
+--------------------+---------+----------+----+-----+---+--------------+
|Electronic City P...|    2 BHK|    1056.0|   2|39.07|  2|          3699|
|    Chikka Tirupathi|4 Bedroom|    2600.0|   5|120.0|  4|          4615|
|         Uttarahalli|    3 BHK|    1440.0|   2| 62.0|  3|          4305|
|  Lingadheeranahalli|    3 BHK|    1521.0|   3| 95.0|  3|          6245|
|            Kothanur|    2 BHK|    1200.0|   2| 51.0|  2|          4250|
|          Whitefield|    2 BHK|    1170.0|   2| 38.0|  2|          3247|
|    Old Airport Road|    4 BHK|    2732.0|   4|204.0|  4|          7467|
|        Rajaji Nagar|    4 BHK|    3300.0|   4|600.0|  4|         18181|
|        Marathahalli|    3 BHK|    1310.0|   3|63.25|  3|          4828|
|               other|6 Bedroom|    1020.0|   6|370.0|  6|         36274|
|          Whitefield|    3 BHK|    18

In [19]:
#Dropping a column from the dataframe:

#Always remember that drop operation in Spark Dataframe is not inplace, so we have to assign>
#> the modified df once again to the original df


spk_df.drop('bath').show()

+--------------------+---------+----------+-----+---+--------------+
|            location|     size|total_sqft|price|bhk|price_per_sqft|
+--------------------+---------+----------+-----+---+--------------+
|Electronic City P...|    2 BHK|    1056.0|39.07|  2|          3699|
|    Chikka Tirupathi|4 Bedroom|    2600.0|120.0|  4|          4615|
|         Uttarahalli|    3 BHK|    1440.0| 62.0|  3|          4305|
|  Lingadheeranahalli|    3 BHK|    1521.0| 95.0|  3|          6245|
|            Kothanur|    2 BHK|    1200.0| 51.0|  2|          4250|
|          Whitefield|    2 BHK|    1170.0| 38.0|  2|          3247|
|    Old Airport Road|    4 BHK|    2732.0|204.0|  4|          7467|
|        Rajaji Nagar|    4 BHK|    3300.0|600.0|  4|         18181|
|        Marathahalli|    3 BHK|    1310.0|63.25|  3|          4828|
|               other|6 Bedroom|    1020.0|370.0|  6|         36274|
|          Whitefield|    3 BHK|    1800.0| 70.0|  3|          3888|
|          Whitefield|4 Bedroom|  

In [44]:
#Seeking Rows which have null values in one column or the other, using the where clause

spk_df.where(col('location').isNull()).show()

+--------+-----+----------+----+-----+---+--------------+
|location| size|total_sqft|bath|price|bhk|price_per_sqft|
+--------+-----+----------+----+-----+---+--------------+
|    null|3 BHK|    1925.0|null|125.0|  3|          null|
|    null|1 BHK|      null|   1| 38.0|  1|          6333|
|    null| null|      null|null| 50.0|  2|          3937|
|    null| null|      null|null| null|  3|          5307|
+--------+-----+----------+----+-----+---+--------------+



In [47]:
#Removing or Dropping the nan value without any parematere. In this situation it has dropped >
#> all the rows having null values in any of the columns, even if there is just one null value 


spk_df.na.drop().show()

+--------------------+---------+----------+----+-----+---+--------------+
|            location|     size|total_sqft|bath|price|bhk|price_per_sqft|
+--------------------+---------+----------+----+-----+---+--------------+
|Electronic City P...|    2 BHK|    1056.0|   2|39.07|  2|          3699|
|    Chikka Tirupathi|4 Bedroom|    2600.0|   5|120.0|  4|          4615|
|         Uttarahalli|    3 BHK|    1440.0|   2| 62.0|  3|          4305|
|  Lingadheeranahalli|    3 BHK|    1521.0|   3| 95.0|  3|          6245|
|            Kothanur|    2 BHK|    1200.0|   2| 51.0|  2|          4250|
|          Whitefield|    2 BHK|    1170.0|   2| 38.0|  2|          3247|
|    Old Airport Road|    4 BHK|    2732.0|   4|204.0|  4|          7467|
|        Rajaji Nagar|    4 BHK|    3300.0|   4|600.0|  4|         18181|
|        Marathahalli|    3 BHK|    1310.0|   3|63.25|  3|          4828|
|               other|6 Bedroom|    1020.0|   6|370.0|  6|         36274|
|          Whitefield|    3 BHK|    18

In [50]:
#using the parameters inside the drop

#Any - Entire Row would drop if any of the column has null values for that specific row.

spk_df.na.drop(how='any').show()

+--------------------+---------+----------+----+-----+---+--------------+
|            location|     size|total_sqft|bath|price|bhk|price_per_sqft|
+--------------------+---------+----------+----+-----+---+--------------+
|Electronic City P...|    2 BHK|    1056.0|   2|39.07|  2|          3699|
|    Chikka Tirupathi|4 Bedroom|    2600.0|   5|120.0|  4|          4615|
|         Uttarahalli|    3 BHK|    1440.0|   2| 62.0|  3|          4305|
|  Lingadheeranahalli|    3 BHK|    1521.0|   3| 95.0|  3|          6245|
|            Kothanur|    2 BHK|    1200.0|   2| 51.0|  2|          4250|
|          Whitefield|    2 BHK|    1170.0|   2| 38.0|  2|          3247|
|    Old Airport Road|    4 BHK|    2732.0|   4|204.0|  4|          7467|
|        Rajaji Nagar|    4 BHK|    3300.0|   4|600.0|  4|         18181|
|        Marathahalli|    3 BHK|    1310.0|   3|63.25|  3|          4828|
|               other|6 Bedroom|    1020.0|   6|370.0|  6|         36274|
|          Whitefield|    3 BHK|    18

In [51]:
#All - Row would drop only if all the values in columns are null for that specific row

spk_df.na.drop(how='all').show()

+--------------------+---------+----------+----+-----+---+--------------+
|            location|     size|total_sqft|bath|price|bhk|price_per_sqft|
+--------------------+---------+----------+----+-----+---+--------------+
|Electronic City P...|    2 BHK|    1056.0|   2|39.07|  2|          3699|
|    Chikka Tirupathi|4 Bedroom|    2600.0|   5|120.0|  4|          4615|
|         Uttarahalli|    3 BHK|    1440.0|   2| 62.0|  3|          4305|
|  Lingadheeranahalli|    3 BHK|    1521.0|   3| 95.0|  3|          6245|
|            Kothanur|    2 BHK|    1200.0|   2| 51.0|  2|          4250|
|          Whitefield|    2 BHK|    1170.0|   2| 38.0|  2|          3247|
|    Old Airport Road|    4 BHK|    2732.0|   4|204.0|  4|          7467|
|        Rajaji Nagar|    4 BHK|    3300.0|   4|600.0|  4|         18181|
|        Marathahalli|    3 BHK|    1310.0|   3|63.25|  3|          4828|
|               other|6 Bedroom|    1020.0|   6|370.0|  6|         36274|
|          Whitefield|    3 BHK|    18

In [53]:
#Threshold in drop

#Thresh - it basically helps to drop all those rows with the defined number of threshold nulls.
#Like in our case below all the row having 3 or greater nulls would get dropped

spk_df.na.drop(thresh=2).show()

+--------------------+---------+----------+----+-----+---+--------------+
|            location|     size|total_sqft|bath|price|bhk|price_per_sqft|
+--------------------+---------+----------+----+-----+---+--------------+
|Electronic City P...|    2 BHK|    1056.0|   2|39.07|  2|          3699|
|    Chikka Tirupathi|4 Bedroom|    2600.0|   5|120.0|  4|          4615|
|         Uttarahalli|    3 BHK|    1440.0|   2| 62.0|  3|          4305|
|  Lingadheeranahalli|    3 BHK|    1521.0|   3| 95.0|  3|          6245|
|            Kothanur|    2 BHK|    1200.0|   2| 51.0|  2|          4250|
|          Whitefield|    2 BHK|    1170.0|   2| 38.0|  2|          3247|
|    Old Airport Road|    4 BHK|    2732.0|   4|204.0|  4|          7467|
|        Rajaji Nagar|    4 BHK|    3300.0|   4|600.0|  4|         18181|
|        Marathahalli|    3 BHK|    1310.0|   3|63.25|  3|          4828|
|               other|6 Bedroom|    1020.0|   6|370.0|  6|         36274|
|          Whitefield|    3 BHK|    18

In [54]:
#Subset parameter in Drop

#Subset - It basically allows to remove the null values from that column which has been defined in the subset, >
#>and hence the corresponding row would be deleted

spk_df.na.drop(how='any', subset=['location']).show()

#Now in this situation all the records would have been deleted

+--------------------+---------+----------+----+-----+---+--------------+
|            location|     size|total_sqft|bath|price|bhk|price_per_sqft|
+--------------------+---------+----------+----+-----+---+--------------+
|Electronic City P...|    2 BHK|    1056.0|   2|39.07|  2|          3699|
|    Chikka Tirupathi|4 Bedroom|    2600.0|   5|120.0|  4|          4615|
|         Uttarahalli|    3 BHK|    1440.0|   2| 62.0|  3|          4305|
|  Lingadheeranahalli|    3 BHK|    1521.0|   3| 95.0|  3|          6245|
|            Kothanur|    2 BHK|    1200.0|   2| 51.0|  2|          4250|
|          Whitefield|    2 BHK|    1170.0|   2| 38.0|  2|          3247|
|    Old Airport Road|    4 BHK|    2732.0|   4|204.0|  4|          7467|
|        Rajaji Nagar|    4 BHK|    3300.0|   4|600.0|  4|         18181|
|        Marathahalli|    3 BHK|    1310.0|   3|63.25|  3|          4828|
|               other|6 Bedroom|    1020.0|   6|370.0|  6|         36274|
|          Whitefield|    3 BHK|    18

In [77]:
#Importing data using URL

from pyspark import SparkFiles
spark.sparkContext.addFile(url)
url='https://raw.githubusercontent.com/imamanmehrotra/Datasets/main/Salary_sample.csv'
df = spark.read.csv("file://"+SparkFiles.get("Salary_sample.csv"), header=True, inferSchema= True)
print('Schema and Dataset:\n')
df.printSchema()
df.show()

Schema and Dataset:

root
 |-- Experience: string (nullable = true)
 |-- Test_Score: integer (nullable = true)
 |-- Interview Score: integer (nullable = true)
 |-- Salary: integer (nullable = true)

+----------+----------+---------------+------+
|Experience|Test_Score|Interview Score|Salary|
+----------+----------+---------------+------+
|      null|         8|              9| 50000|
|      null|         8|              6| 45000|
|      five|         6|              7| 60000|
|       two|        10|             10| 65000|
|     seven|         9|              6| 70000|
|     three|         7|             10| 62000|
|       ten|      null|              7| 72000|
|    eleven|         7|              8| 80000|
+----------+----------+---------------+------+



In [81]:
#Filling the Missing Values

df.na.fill('Missing Values','Experience').show()

#It has replaced everything with the provided string


+--------------+----------+---------------+------+
|    Experience|Test_Score|Interview Score|Salary|
+--------------+----------+---------------+------+
|Missing Values|         8|              9| 50000|
|Missing Values|         8|              6| 45000|
|          five|         6|              7| 60000|
|           two|        10|             10| 65000|
|         seven|         9|              6| 70000|
|         three|         7|             10| 62000|
|           ten|      null|              7| 72000|
|        eleven|         7|              8| 80000|
+--------------+----------+---------------+------+



In [88]:
#Replaceing the Null value with Mean, Median or Mode: We have to use imputer function for that

from pyspark.ml.feature import Imputer

imputer=Imputer(
    inputCols=['Test_Score','Interview Score','Salary'],
    outputCols=['{}_imputed'.format(c) for c in ['Test_Score','Interview Score','Salary']]
).setStrategy('mean')

imputer.fit(df).transform(df).show()

+----------+----------+---------------+------+------------------+-----------------------+--------------+
|Experience|Test_Score|Interview Score|Salary|Test_Score_imputed|Interview Score_imputed|Salary_imputed|
+----------+----------+---------------+------+------------------+-----------------------+--------------+
|      null|         8|              9| 50000|                 8|                      9|         50000|
|      null|         8|              6| 45000|                 8|                      6|         45000|
|      five|         6|              7| 60000|                 6|                      7|         60000|
|       two|        10|             10| 65000|                10|                     10|         65000|
|     seven|         9|              6| 70000|                 9|                      6|         70000|
|     three|         7|             10| 62000|                 7|                     10|         62000|
|       ten|      null|              7| 72000|         

In [93]:
from pyspark.ml.feature import Imputer

imputer=Imputer(
    inputCols=['Test_Score','Interview Score','Salary'],
    outputCols=['{}_imputed'.format(c) for c in ['Test_Score','Interview Score','Salary']]
).setStrategy('median')

imputer.fit(df).transform(df).show()

+----------+----------+---------------+------+------------------+-----------------------+--------------+
|Experience|Test_Score|Interview Score|Salary|Test_Score_imputed|Interview Score_imputed|Salary_imputed|
+----------+----------+---------------+------+------------------+-----------------------+--------------+
|      null|         8|              9| 50000|                 8|                      9|         50000|
|      null|         8|              6| 45000|                 8|                      6|         45000|
|      five|         6|              7| 60000|                 6|                      7|         60000|
|       two|        10|             10| 65000|                10|                     10|         65000|
|     seven|         9|              6| 70000|                 9|                      6|         70000|
|     three|         7|             10| 62000|                 7|                     10|         62000|
|       ten|      null|              7| 72000|         