### Pyspark Handling Missing Values

- Dropping Columns
- Dropping Rows
- Variou Parameter In Dropping functionalities
- Handling Missing values by Mean, Median And Mode

In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('practice').getOrCreate()

In [4]:
spark

In [6]:
df_pyspark = spark.read.csv('dataset/test1.csv', header = True, inferSchema = True)

In [8]:
df_pyspark.show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
| Krish|  31|        10| 30000|
|Sudhan|  30|         8| 25000|
| Sunny|  29|         4| 20000|
|  Paul|  24|         3| 20000|
|Harsha|  21|         1| 15000|
|Eonkim|  23|         2| 18000|
|  Jeff|  24|         5| 21000|
|Mahesh|null|      null| 40000|
|  null|  34|        10| 38000|
|  null|  36|      null|  null|
+------+----+----------+------+



In [14]:
# drop column
df_pyspark.drop('Name').show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|  24|         5| 21000|
|null|      null| 40000|
|  34|        10| 38000|
|  36|      null|  null|
+----+----------+------+



In [16]:
# drop row
df_pyspark.dropna().show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
| Krish| 31|        10| 30000|
|Sudhan| 30|         8| 25000|
| Sunny| 29|         4| 20000|
|  Paul| 24|         3| 20000|
|Harsha| 21|         1| 15000|
|Eonkim| 23|         2| 18000|
|  Jeff| 24|         5| 21000|
+------+---+----------+------+



In [17]:
# .na.drop() == dropna()
df_pyspark.na.drop().show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
| Krish| 31|        10| 30000|
|Sudhan| 30|         8| 25000|
| Sunny| 29|         4| 20000|
|  Paul| 24|         3| 20000|
|Harsha| 21|         1| 15000|
|Eonkim| 23|         2| 18000|
|  Jeff| 24|         5| 21000|
+------+---+----------+------+



In [22]:
### any == how / default -> any
# all -> 전부 null 값을 가지면 제거한다.
# any -> 하나라도 null 값을 가지면 제거한다.
df_pyspark.na.drop(how = 'all').show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
| Krish|  31|        10| 30000|
|Sudhan|  30|         8| 25000|
| Sunny|  29|         4| 20000|
|  Paul|  24|         3| 20000|
|Harsha|  21|         1| 15000|
|Eonkim|  23|         2| 18000|
|  Jeff|  24|         5| 21000|
|Mahesh|null|      null| 40000|
|  null|  34|        10| 38000|
|  null|  36|      null|  null|
+------+----+----------+------+



In [24]:
df_pyspark.dropna(how = 'all').show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
| Krish|  31|        10| 30000|
|Sudhan|  30|         8| 25000|
| Sunny|  29|         4| 20000|
|  Paul|  24|         3| 20000|
|Harsha|  21|         1| 15000|
|Eonkim|  23|         2| 18000|
|  Jeff|  24|         5| 21000|
|Mahesh|null|      null| 40000|
|  null|  34|        10| 38000|
|  null|  36|      null|  null|
+------+----+----------+------+



In [41]:
# thresh -> 임계값이다. null 값이 아닌 column의 수를 의미한다.
# null 값이 아닌 column이 최소 thresh 만큼 있어야 한다는 것을 의미.
df_pyspark.dropna(how = 'all', thresh = 1).show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
| Krish|  31|        10| 30000|
|Sudhan|  30|         8| 25000|
| Sunny|  29|         4| 20000|
|  Paul|  24|         3| 20000|
|Harsha|  21|         1| 15000|
|Eonkim|  23|         2| 18000|
|  Jeff|  24|         5| 21000|
|Mahesh|null|      null| 40000|
|  null|  34|        10| 38000|
|  null|  36|      null|  null|
+------+----+----------+------+



In [38]:
df_pyspark.dropna(how = 'any', thresh = 3).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
| Krish| 31|        10| 30000|
|Sudhan| 30|         8| 25000|
| Sunny| 29|         4| 20000|
|  Paul| 24|         3| 20000|
|Harsha| 21|         1| 15000|
|Eonkim| 23|         2| 18000|
|  Jeff| 24|         5| 21000|
|  null| 34|        10| 38000|
+------+---+----------+------+



In [43]:
# subset
# 특정 열을 기준으로 dropna를 하고 싶을땐 subset을 통해 진행할 수 있다. 
df_pyspark.na.drop(how = 'any', subset = ['age']).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
| Krish| 31|        10| 30000|
|Sudhan| 30|         8| 25000|
| Sunny| 29|         4| 20000|
|  Paul| 24|         3| 20000|
|Harsha| 21|         1| 15000|
|Eonkim| 23|         2| 18000|
|  Jeff| 24|         5| 21000|
|  null| 34|        10| 38000|
|  null| 36|      null|  null|
+------+---+----------+------+



In [51]:
# Filling the Missing Value
df_pyspark1 = spark.read.csv('dataset/test1.csv', header = True)

In [53]:
df_pyspark1.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [56]:
# 만약 column의 schema가 string일 경우 string에 대해서만 채우기가 되고, 
# integer일 경우 fill 안의 value를 integer로 입력해야지만 fill이 된다. 
df_pyspark1.na.fill('Missing Values').show()

+--------------+--------------+--------------+--------------+
|          Name|           age|    Experience|        Salary|
+--------------+--------------+--------------+--------------+
|         Krish|            31|            10|         30000|
|        Sudhan|            30|             8|         25000|
|         Sunny|            29|             4|         20000|
|          Paul|            24|             3|         20000|
|        Harsha|            21|             1|         15000|
|        Eonkim|            23|             2|         18000|
|          Jeff|            24|             5|         21000|
|        Mahesh|Missing Values|Missing Values|         40000|
|Missing Values|            34|            10|         38000|
|Missing Values|            36|Missing Values|Missing Values|
+--------------+--------------+--------------+--------------+



In [58]:
df_pyspark1.fillna('Missing Values').show()

+--------------+--------------+--------------+--------------+
|          Name|           age|    Experience|        Salary|
+--------------+--------------+--------------+--------------+
|         Krish|            31|            10|         30000|
|        Sudhan|            30|             8|         25000|
|         Sunny|            29|             4|         20000|
|          Paul|            24|             3|         20000|
|        Harsha|            21|             1|         15000|
|        Eonkim|            23|             2|         18000|
|          Jeff|            24|             5|         21000|
|        Mahesh|Missing Values|Missing Values|         40000|
|Missing Values|            34|            10|         38000|
|Missing Values|            36|Missing Values|Missing Values|
+--------------+--------------+--------------+--------------+



In [60]:
# fillna(value, column)
# value -> 대체하고 싶은 값
# column -> 실행하고 싶은 값
df_pyspark1.fillna('Missing Values', 'Name').show()

+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|        Sudhan|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|        Eonkim|  23|         2| 18000|
|          Jeff|  24|         5| 21000|
|        Mahesh|null|      null| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      null|  null|
+--------------+----+----------+------+



In [59]:
df_pyspark1.fillna(10).show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
| Krish|  31|        10| 30000|
|Sudhan|  30|         8| 25000|
| Sunny|  29|         4| 20000|
|  Paul|  24|         3| 20000|
|Harsha|  21|         1| 15000|
|Eonkim|  23|         2| 18000|
|  Jeff|  24|         5| 21000|
|Mahesh|null|      null| 40000|
|  null|  34|        10| 38000|
|  null|  36|      null|  null|
+------+----+----------+------+



In [64]:
df_pyspark.fillna('Missing Values', ['Name', 'Experience']).show()

+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|        Sudhan|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|        Eonkim|  23|         2| 18000|
|          Jeff|  24|         5| 21000|
|        Mahesh|null|      null| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      null|  null|
+--------------+----+----------+------+



In [65]:
from pyspark.ml.feature import Imputer

In [87]:
# Imputer(inputCols, outputCols).setStrategy('mean')
# 새로운 column은 nan값을 setStartegy('mean')으로 대체한 상태로 채워진다.
imputer_mean = Imputer(
    inputCols = ['age', 'Experience', 'Salary'], 
    outputCols = ['{}.inputed'.format(c) for c in ['age', 'Experience', 'Salary']]
).setStrategy('mean')

In [88]:
imputer_mean.fit(df_pyspark).transform(df_pyspark).show()

+------+----+----------+------+-----------+------------------+--------------+
|  Name| age|Experience|Salary|age.inputed|Experience.inputed|Salary.inputed|
+------+----+----------+------+-----------+------------------+--------------+
| Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhan|  30|         8| 25000|         30|                 8|         25000|
| Sunny|  29|         4| 20000|         29|                 4|         20000|
|  Paul|  24|         3| 20000|         24|                 3|         20000|
|Harsha|  21|         1| 15000|         21|                 1|         15000|
|Eonkim|  23|         2| 18000|         23|                 2|         18000|
|  Jeff|  24|         5| 21000|         24|                 5|         21000|
|Mahesh|null|      null| 40000|         28|                 5|         40000|
|  null|  34|        10| 38000|         34|                10|         38000|
|  null|  36|      null|  null|         36|                 5|  

In [71]:
df_pyspark.show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
| Krish|  31|        10| 30000|
|Sudhan|  30|         8| 25000|
| Sunny|  29|         4| 20000|
|  Paul|  24|         3| 20000|
|Harsha|  21|         1| 15000|
|Eonkim|  23|         2| 18000|
|  Jeff|  24|         5| 21000|
|Mahesh|null|      null| 40000|
|  null|  34|        10| 38000|
|  null|  36|      null|  null|
+------+----+----------+------+



In [83]:
imputer_median = Imputer(
    inputCols = ['age', 'Experience', 'Salary'], 
    outputCols = [f'{c}.imputed' for c in ['age', 'Experience', 'Salary']]
).setStrategy('median')

In [86]:
imputer_median.fit(df_pyspark).transform(df_pyspark).show()

+------+----+----------+------+-----------+------------------+--------------+
|  Name| age|Experience|Salary|age.imputed|Experience.imputed|Salary.imputed|
+------+----+----------+------+-----------+------------------+--------------+
| Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhan|  30|         8| 25000|         30|                 8|         25000|
| Sunny|  29|         4| 20000|         29|                 4|         20000|
|  Paul|  24|         3| 20000|         24|                 3|         20000|
|Harsha|  21|         1| 15000|         21|                 1|         15000|
|Eonkim|  23|         2| 18000|         23|                 2|         18000|
|  Jeff|  24|         5| 21000|         24|                 5|         21000|
|Mahesh|null|      null| 40000|         29|                 4|         40000|
|  null|  34|        10| 38000|         34|                10|         38000|
|  null|  36|      null|  null|         36|                 4|  