# Pyspark Handling Missing Values

* Dropping Columns
* Dropping Rows
* Various Parameter in Dropping functionalities.
* Handling Missing values by Mean, Median and Mode.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [20]:
df_pyspark = spark.read.csv('DataClean.csv', header=True, inferSchema=True)

In [21]:
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   David|  21|        10| 30000|
|    Paul|  31|         3| 25000|
|Jonathan|  26|         6| 18000|
| Phillip|  19|         4| 16000|
|  Mathew|NULL|         4| 26000|
|  Hakeem|  34|      NULL| 29000|
|Suleiman|  23|         5| 21000|
|    Hawa|NULL|      NULL| 25000|
|    NULL|  26|         9|  NULL|
|    NULL|  20|      NULL| 23000|
+--------+----+----------+------+



In [22]:
##drop the columns
dp = df_pyspark.drop('Name')
dp.show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  21|        10| 30000|
|  31|         3| 25000|
|  26|         6| 18000|
|  19|         4| 16000|
|NULL|         4| 26000|
|  34|      NULL| 29000|
|  23|         5| 21000|
|NULL|      NULL| 25000|
|  26|         9|  NULL|
|  20|      NULL| 23000|
+----+----------+------+



In [23]:
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   David|  21|        10| 30000|
|    Paul|  31|         3| 25000|
|Jonathan|  26|         6| 18000|
| Phillip|  19|         4| 16000|
|  Mathew|NULL|         4| 26000|
|  Hakeem|  34|      NULL| 29000|
|Suleiman|  23|         5| 21000|
|    Hawa|NULL|      NULL| 25000|
|    NULL|  26|         9|  NULL|
|    NULL|  20|      NULL| 23000|
+--------+----+----------+------+



In [29]:
dp_na = df_pyspark.na.drop()
dp_na.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   David| 21|        10| 30000|
|    Paul| 31|         3| 25000|
|Jonathan| 26|         6| 18000|
| Phillip| 19|         4| 16000|
|Suleiman| 23|         5| 21000|
+--------+---+----------+------+



In [31]:
### any==how
# we have 'any' and 'all'
# thres = "atleast n non-null value."
# subset
dp_na = df_pyspark.na.drop(how='all', thresh=3)
dp_na.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   David|  21|        10| 30000|
|    Paul|  31|         3| 25000|
|Jonathan|  26|         6| 18000|
| Phillip|  19|         4| 16000|
|  Mathew|NULL|         4| 26000|
|  Hakeem|  34|      NULL| 29000|
|Suleiman|  23|         5| 21000|
+--------+----+----------+------+



In [34]:
# subset
# Applying the na method to a specific column
dp_na = df_pyspark.na.drop(how='all', subset=['Age'])
dp_na.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   David| 21|        10| 30000|
|    Paul| 31|         3| 25000|
|Jonathan| 26|         6| 18000|
| Phillip| 19|         4| 16000|
|  Hakeem| 34|      NULL| 29000|
|Suleiman| 23|         5| 21000|
|    NULL| 26|         9|  NULL|
|    NULL| 20|      NULL| 23000|
+--------+---+----------+------+



In [44]:
## Filling the Missing value
df_pyspark.na.fill("Missing", 'Experience').show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   David|  21|        10| 30000|
|    Paul|  31|         3| 25000|
|Jonathan|  26|         6| 18000|
| Phillip|  19|         4| 16000|
|  Mathew|NULL|         4| 26000|
|  Hakeem|  34|      NULL| 29000|
|Suleiman|  23|         5| 21000|
|    Hawa|NULL|      NULL| 25000|
|    NULL|  26|         9|  NULL|
|    NULL|  20|      NULL| 23000|
+--------+----+----------+------+



In [49]:
from pyspark.ml.feature import Imputer

mylist = ["Age", 'Experience', 'Salary']
imputer = Imputer(
    inputCols=mylist,
    outputCols=[f"{c}_imputed" for c in mylist]
).setStrategy("mean")

In [50]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+--------+----+----------+------+-----------+------------------+--------------+
|    Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------+----+----------+------+-----------+------------------+--------------+
|   David|  21|        10| 30000|         21|                10|         30000|
|    Paul|  31|         3| 25000|         31|                 3|         25000|
|Jonathan|  26|         6| 18000|         26|                 6|         18000|
| Phillip|  19|         4| 16000|         19|                 4|         16000|
|  Mathew|NULL|         4| 26000|         25|                 4|         26000|
|  Hakeem|  34|      NULL| 29000|         34|                 5|         29000|
|Suleiman|  23|         5| 21000|         23|                 5|         21000|
|    Hawa|NULL|      NULL| 25000|         25|                 5|         25000|
|    NULL|  26|         9|  NULL|         26|                 9|         23666|
|    NULL|  20|      NULL| 23000|       

## Filter Operations

* &, |, ==
* ~

In [53]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Filter").getOrCreate()

In [54]:
df_spark = spark.read.csv('filter.csv', header=True, inferSchema=True)
df_spark.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   David| 21|        10| 30000|
|    Paul| 31|         3| 25000|
|Jonathan| 26|         6| 18000|
| Phillip| 19|         4| 16000|
|Suleiman| 23|         5| 21000|
+--------+---+----------+------+



In [59]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [60]:
### Salary of the people less than or equal to 20000
df_spark.filter('Salary<=20000').show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|Jonathan| 26|         6| 18000|
| Phillip| 19|         4| 16000|
+--------+---+----------+------+



In [64]:
df_spark.filter("Salary > 20000").select(['Name','Salary']).show()

+--------+------+
|    Name|Salary|
+--------+------+
|   David| 30000|
|    Paul| 25000|
|Suleiman| 21000|
+--------+------+



In [67]:
## Combining filters
df_spark.filter((df_spark['Salary']>25000) &
                (df_spark['Age']>20)
                ).show()

+-----+---+----------+------+
| Name|Age|Experience|Salary|
+-----+---+----------+------+
|David| 21|        10| 30000|
+-----+---+----------+------+



In [69]:
df_spark.filter((df_spark['Salary']>25000) |
                (df_spark['Age']>20)
                ).select(['Name','Experience']).show()

+--------+----------+
|    Name|Experience|
+--------+----------+
|   David|        10|
|    Paul|         3|
|Jonathan|         6|
|Suleiman|         5|
+--------+----------+



In [70]:
## Inverse operation
df_spark.filter(~(df_spark['Salary']>25000)
                ).select(['Name','Experience']).show()

+--------+----------+
|    Name|Experience|
+--------+----------+
|    Paul|         3|
|Jonathan|         6|
| Phillip|         4|
|Suleiman|         5|
+--------+----------+



24/06/15 00:05:31 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1800911 ms exceeds timeout 120000 ms
24/06/15 00:05:32 WARN SparkContext: Killing executors is not supported by current scheduler.
24/06/15 00:05:33 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at 