# Handling Missing Data

Often data sources are incomplete, which means you will have missing data, you have 3 basic options for filling in missing data (you will personally have to make the decision for what is the right approach:

* Just keep the missing data points.
* Drop them missing data points (including the entire row)
* Fill them in with some other value.

Let's cover examples of each of these methods!

In [1]:
# Set the PySpark Connection:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\Spark\\spark-3.1.2-bin-hadoop3.2'

In [2]:
from pyspark.sql import SparkSession
# May take a little while on a local computer
spark = SparkSession.builder.appName("missingdata").getOrCreate()

In [3]:
df = spark.read.csv("ContainsNull.csv",header=True,inferSchema=True)

In [4]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



Notice how the data remains as a null.

## Drop the missing data

You can use the .na functions for missing data. The drop command has the following parameters:

    df.na.drop(how='any', thresh=None, subset=None)
    
    * param how: 'any' or 'all'.
    
        If 'any', drop a row if it contains any nulls.
        If 'all', drop a row only if all its values are null.
    
    * param thresh: int, default None
    
        If specified, drop rows that have less than `thresh` non-null values.
        This overwrites the `how` parameter.
        
    * param subset: 
        optional list of column names to consider.

In [5]:
# Drop any row that contains missing data
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [6]:
# Has to have at least 2 NON-null values
df.na.drop(thresh = 2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [7]:
df.na.drop(subset=["Sales"]).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [8]:
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [9]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## Fill the missing values

We can also fill the missing values with new values. If you have multiple nulls across multiple data types, Spark is actually smart enough to match up the data types. For example:

In [10]:
df.na.fill('NEW VALUE').show()

+----+---------+-----+
|  Id|     Name|Sales|
+----+---------+-----+
|emp1|     John| null|
|emp2|NEW VALUE| null|
|emp3|NEW VALUE|345.0|
|emp4|    Cindy|456.0|
+----+---------+-----+



In [11]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



Usually you should specify what columns you want to fill with the subset parameter

In [12]:
df.na.fill('No Name',subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



A very common practice is to fill values with the mean value for the column, for example:

In [13]:
from pyspark.sql.functions import mean
mean_val = df.select(mean(df['Sales'])).collect()

# Weird nested formatting of Row object!
mean_val

[Row(avg(Sales)=400.5)]

In [14]:
mean_val[0]

Row(avg(Sales)=400.5)

In [15]:
mean_val[0][0]

400.5

In [16]:
mean_sales = mean_val[0][0]

In [17]:
df.na.fill(mean_sales,["Sales"]).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [18]:
# One (very ugly) one-liner
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### A Real-life Example

In [19]:
filename = "melb_data.csv"

In [21]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(filename, header=True, inferSchema=True, sep=',')
df.show(5, False)

+----------+----------------+-----+----+---------+------+-------+---------+--------+--------+--------+--------+---+--------+------------+---------+-----------+---------+----------+---------------------+-------------+
|Suburb    |Address         |Rooms|Type|Price    |Method|SellerG|Date     |Distance|Postcode|Bedroom2|Bathroom|Car|Landsize|BuildingArea|YearBuilt|CouncilArea|Lattitude|Longtitude|Regionname           |Propertycount|
+----------+----------------+-----+----+---------+------+-------+---------+--------+--------+--------+--------+---+--------+------------+---------+-----------+---------+----------+---------------------+-------------+
|Abbotsford|85 Turner St    |2    |h   |1480000.0|S     |Biggin |3/12/2016|2.5     |3067.0  |2.0     |1.0     |1.0|202.0   |null        |null     |Yarra      |-37.7996 |144.9984  |Northern Metropolitan|4019.0       |
|Abbotsford|25 Bloomburg St |2    |h   |1035000.0|S     |Biggin |4/02/2016|2.5     |3067.0  |2.0     |1.0     |0.0|156.0   |79.0    

In [22]:
df.describe()

DataFrame[summary: string, Suburb: string, Address: string, Rooms: string, Type: string, Price: string, Method: string, SellerG: string, Date: string, Distance: string, Postcode: string, Bedroom2: string, Bathroom: string, Car: string, Landsize: string, BuildingArea: string, YearBuilt: string, CouncilArea: string, Lattitude: string, Longtitude: string, Regionname: string, Propertycount: string]

In [23]:
df.describe().toPandas()

Unnamed: 0,summary,Suburb,Address,Rooms,Type,Price,Method,SellerG,Date,Distance,...,Bathroom,Car,Landsize,BuildingArea,YearBuilt,CouncilArea,Lattitude,Longtitude,Regionname,Propertycount
0,count,13580,13580,13580.0,13580,13580.0,13580,13580,13580,13580.0,...,13580.0,13518.0,13580.0,7130.0,8205.0,12211,13580.0,13580.0,13580,13580.0
1,mean,,,2.9379970544919,,1075684.079455081,,,,10.137776141384116,...,1.5342415316642122,1.6100754549489569,558.4161266568483,151.96764988779805,1964.6842169408897,,-37.80920273343151,144.99521618777578,,7454.417378497791
2,stddev,,,0.9557479384215564,,639310.7242960163,,,,5.868724943071715,...,0.6917117224588424,0.962633519245631,3990.669241109034,541.0145376263513,37.27376222396062,,0.0792598226035583,0.1039155614073097,,4378.581771795497
3,min,Abbotsford,1 Adelle Ct,1.0,h,85000.0,PI,@Realty,1/07/2017,0.0,...,0.0,0.0,0.0,0.0,1196.0,Banyule,-38.18255,144.43181,Eastern Metropolitan,249.0
4,max,Yarraville,9b Stewart St,10.0,u,9000000.0,VB,iTRAK,9/09/2017,48.1,...,8.0,10.0,433014.0,44515.0,2018.0,Yarra Ranges,-37.40853,145.52635,Western Victoria,21650.0


In [24]:
df.printSchema()

root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Method: string (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Postcode: double (nullable = true)
 |-- Bedroom2: double (nullable = true)
 |-- Bathroom: double (nullable = true)
 |-- Car: double (nullable = true)
 |-- Landsize: double (nullable = true)
 |-- BuildingArea: double (nullable = true)
 |-- YearBuilt: double (nullable = true)
 |-- CouncilArea: string (nullable = true)
 |-- Lattitude: double (nullable = true)
 |-- Longtitude: double (nullable = true)
 |-- Regionname: string (nullable = true)
 |-- Propertycount: double (nullable = true)



## Question 1: Calculate the cardinality for each variable in the dataset. Can you reject any variables based on cardinality? (Hint: Look for cardinality of 1)

In [25]:
from pyspark.sql.functions import approxCountDistinct, countDistinct

"""
Note: approxCountDistinct and countDistinct can be used interchangeably. Only difference is the computation time. 

"approxCountDistinct" is useful for large datasets 
"countDistinct" for small and medium datasets.

"""

def cardinality_calculation(df, cut_off=1):
    cardinality = df.select(*[approxCountDistinct(c).alias(c) for c in df.columns])
    
    ## convert to pandas for efficient calculations
    final_cardinality_df = cardinality.toPandas().transpose()
    final_cardinality_df.reset_index(inplace=True) 
    final_cardinality_df.rename(columns={0:'Cardinality'}, inplace=True) 
    
    #select variables with cardinality of 1
    vars_selected = final_cardinality_df['index'][final_cardinality_df['Cardinality'] <= cut_off] 
    
    return final_cardinality_df, vars_selected


In [26]:
cardinality_df, cardinality_vars_selected = cardinality_calculation(df)

In [27]:
cardinality_df

Unnamed: 0,index,Cardinality
0,Suburb,318
1,Address,14421
2,Rooms,9
3,Type,3
4,Price,2328
5,Method,5
6,SellerG,277
7,Date,58
8,Distance,207
9,Postcode,207


In [28]:
cardinality_vars_selected

Series([], Name: index, dtype: object)

###### Answer: You cannot reject any variables based on cardinality.

### Question 2: Calculate the missing value percentage for each variable in the dataset. Can you eliminate any variables based on missing values? (Hint: Any value that has more than 45% missing values)

In [29]:
from pyspark.sql.functions import count, when, isnan, col

def missing_calculation(df, miss_percentage=0.45):
    
    #checks for both NaN and null values
    missing = df.select(*[count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
    length_df = df.count()
    ## convert to pandas for efficient calculations
    final_missing_df = missing.toPandas().transpose()
    final_missing_df.reset_index(inplace=True) 
    final_missing_df.rename(columns={0:'missing_count'}, inplace=True) 
    final_missing_df['missing_percentage'] = final_missing_df['missing_count']/length_df
    
    #select variables with missing percentage greater than 45%
    vars_selected = final_missing_df['index'][final_missing_df['missing_percentage'] >= miss_percentage] 
    
    return final_missing_df, vars_selected

In [30]:
missing_df, missing_vars_selected = missing_calculation(df)

In [31]:
missing_df

Unnamed: 0,index,missing_count,missing_percentage
0,Suburb,0,0.0
1,Address,0,0.0
2,Rooms,0,0.0
3,Type,0,0.0
4,Price,0,0.0
5,Method,0,0.0
6,SellerG,0,0.0
7,Date,0,0.0
8,Distance,0,0.0
9,Postcode,0,0.0


In [32]:
missing_df

Unnamed: 0,index,missing_count,missing_percentage
0,Suburb,0,0.0
1,Address,0,0.0
2,Rooms,0,0.0
3,Type,0,0.0
4,Price,0,0.0
5,Method,0,0.0
6,SellerG,0,0.0
7,Date,0,0.0
8,Distance,0,0.0
9,Postcode,0,0.0


In [33]:
missing_vars_selected

14    BuildingArea
Name: index, dtype: object

##### Answer: You can drop "BuildingArea" variable based on missing values.

### Question 3: Impute the YearBuilt column with median value. (Hint: Use 0.1 for relativeError)

In [39]:
median_YearBuilt =df.approxQuantile('YearBuilt',[0.5],0.1)
# Alternative: median_YearBuilt =df.approxQuantile('YearBuilt',[0.5],0.1)

In [40]:
median_YearBuilt

[1960.0]

In [37]:
median_YearBuilt[0]

1960.0

In [41]:
# count missing values before imputation
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ['YearBuilt']]).show()

+---------+
|YearBuilt|
+---------+
|     5375|
+---------+



In [45]:
from pyspark.sql.functions import when, isnan, col, isnull

df = df.withColumn('imp_YearBuilt', *[when(isnan(c) | isnull(c), median_YearBuilt[0]).otherwise(col(c)) for c in ['YearBuilt']])

In [46]:
# count missing values after imputation
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ['imp_YearBuilt']]).show()

+-------------+
|imp_YearBuilt|
+-------------+
|            0|
+-------------+



### Impute existing column with median value

In [48]:
df = df.fillna({'YearBuilt' :median_YearBuilt[0]})

In [49]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ['YearBuilt']]).show()

+---------+
|YearBuilt|
+---------+
|        0|
+---------+



### Question 4: Impute BuildingArea column to create a new variable called mean_imputed_BuildingArea and calculate the new mean after imputation.

In [55]:
from pyspark.sql.functions import mean, avg

mean_before_imputation = df.agg(avg('BuildingArea')).first()[0]

mean_before_imputation

151.96764988779805

In [56]:
df = df.fillna({'BuildingArea' :mean_before_imputation})

In [57]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ['BuildingArea']]).show()

+------------+
|BuildingArea|
+------------+
|           0|
+------------+



In [58]:
df.agg(avg('BuildingArea')).first()[0]

151.96764988779142

In [59]:
spark.stop()

### Great job!