In [1]:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('MissingValues').getOrCreate()

In [2]:
spark

In [55]:
df_pyspark=spark.read.csv('test2.csv', header=True, inferSchema=True)

In [5]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [56]:
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|    Suny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [17]:
# There is no .shape in pyspark
print('shape : ', (df_pyspark.count(), len(df_pyspark.columns)))

(9, 4)


In [22]:
# Drop rows which contain null value
df_pyspark2=df_pyspark.na.drop()

In [24]:
df_pyspark2.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudhansh| 30|         8| 25000|
|    Suny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|  Harsha| 21|         1| 15000|
| Shubham| 23|         2| 18000|
+--------+---+----------+------+



In [25]:
print('shape : ', (df_pyspark2.count(), len(df_pyspark2.columns)))

shape :  (6, 4)


In [41]:
# Shift + tab + tab to view the documentation for a specific method
# by default how='any', this means drop rows if any of the value is null; similar concept for 'all'
# by default thresh=None, if this is assigned k value, drop rows that consist of < k non-null values
#     this overwrites the how parameter
df_pyspark3=df_pyspark.na.drop(how='any', thresh=2)

In [43]:
df_pyspark3.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|    Suny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|  Mahesh|null|      null| 40000|
|    null|  34|        10| 38000|
+--------+----+----------+------+



In [46]:
# Set subset to apply the drop function only to certain column(s)
df_pyspark4=df_pyspark.na.drop(how='any', subset=['Age'])

In [48]:
df_pyspark4.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudhansh| 30|         8| 25000|
|    Suny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|  Harsha| 21|         1| 15000|
| Shubham| 23|         2| 18000|
|    null| 34|        10| 38000|
|    null| 36|      null|  null|
+--------+---+----------+------+



In [72]:
# Filling up null values with pre-determined values
# Note that the replaced values must be of same datatype as the column, otherwise it cannot be replaced
df_pyspark5=df_pyspark.na.fill({
                'Name' : 'Missing Value',
                'Age'  : -1,
                'Experience': -1,
                'Salary'    : -1,
                })

In [70]:
df_pyspark5.show()

+--------------+---+----------+------+
|          Name|Age|Experience|Salary|
+--------------+---+----------+------+
|         Krish| 31|        10| 30000|
|      Sudhansh| 30|         8| 25000|
|          Suny| 29|         4| 20000|
|          Paul| 24|         3| 20000|
|        Harsha| 21|         1| 15000|
|       Shubham| 23|         2| 18000|
|        Mahesh| -1|        -1| 40000|
|Missing Values| 34|        10| 38000|
|Missing Values| 36|        -1|    -1|
+--------------+---+----------+------+



In [73]:
# Filling up null values with computed values : mean,  median, etc.
from pyspark.ml.feature import Imputer # 估算值

imputer=Imputer(
        inputCols=['Age', 'Experience', 'Salary'],
        outputCols=['{}_imputed'.format(c) for c in ['Age', 'Experience', 'Salary']]
    ).setStrategy('mean')

In [83]:
# Add imputation columns to df_pyspark
df_pyspark6=imputer.fit(df_pyspark).transform(df_pyspark)

In [84]:
df_pyspark6.show()

+--------+----+----------+------+-----------+------------------+--------------+
|    Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------+----+----------+------+-----------+------------------+--------------+
|   Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhansh|  30|         8| 25000|         30|                 8|         25000|
|    Suny|  29|         4| 20000|         29|                 4|         20000|
|    Paul|  24|         3| 20000|         24|                 3|         20000|
|  Harsha|  21|         1| 15000|         21|                 1|         15000|
| Shubham|  23|         2| 18000|         23|                 2|         18000|
|  Mahesh|null|      null| 40000|         28|                 5|         40000|
|    null|  34|        10| 38000|         34|                10|         38000|
|    null|  36|      null|  null|         36|                 5|         25750|
+--------+----+----------+------+-------