In [None]:
# Example of Imputer function in PySpark
# ref: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.Imputer

""" What is Imputation?
Imputation means replacing the missing values in a dataset with the substituted values.
The substituted values can be any random value, mean, median. But it is not advisable to use random values.
The common methods are mean, median.
In PySpark, the Imputer function is used to fill the missing values either by mean or meadian of the column.
The column should be of type Double or Float. 
Currently PySpark does not support cateforical features.

Note: mean, median values are computed after filtering out the missing values
"""

In [2]:
from __future__ import print_function
from pyspark.ml.feature import Imputer
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession \
    .builder \
    .appName("Imputer_Example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
# creating sample data manually within the program

df = spark.createDataFrame([
    (10.0, 10.1),
    (20.0, float("nan")),
    (30.0, float("nan")),
    (float("nan"), 40.1),
    (50.0, 50.1),
    (60.0, 60.1),
    (70.0, 70.1)
], ["x", "y"])

In [6]:
df.show()

+----+----+
|   x|   y|
+----+----+
|10.0|10.1|
|20.0| NaN|
|30.0| NaN|
| NaN|40.1|
|50.0|50.1|
|60.0|60.1|
|70.0|70.1|
+----+----+



In [8]:
# triggering Imputer function
imputer = Imputer(inputCols=["x", "y"], outputCols=["im_x", "im_y"])

In [9]:
# fiting the model
model = imputer.fit(df)

In [10]:
model.transform(df).show()

+----+----+----+----+
|   x|   y|im_x|im_y|
+----+----+----+----+
|10.0|10.1|10.0|10.1|
|20.0| NaN|20.0|46.1|
|30.0| NaN|30.0|46.1|
| NaN|40.1|40.0|40.1|
|50.0|50.1|50.0|50.1|
|60.0|60.1|60.0|60.1|
|70.0|70.1|70.0|70.1|
+----+----+----+----+



In [11]:
# Explicitly declaring median as Imputation estimator.
impute_median = imputer.setStrategy("median").fit(df).transform(df).show()

+----+----+----+----+
|   x|   y|im_x|im_y|
+----+----+----+----+
|10.0|10.1|10.0|10.1|
|20.0| NaN|20.0|50.1|
|30.0| NaN|30.0|50.1|
| NaN|40.1|30.0|40.1|
|50.0|50.1|50.0|50.1|
|60.0|60.1|60.0|60.1|
|70.0|70.1|70.0|70.1|
+----+----+----+----+



In [12]:
# Explicitly declaring mean as Imputation estimator (different way)
impute_mean = imputer.setStrategy("mean")
model1 = impute_mean.fit(df)
model1.transform(df).show()

+----+----+----+----+
|   x|   y|im_x|im_y|
+----+----+----+----+
|10.0|10.1|10.0|10.1|
|20.0| NaN|20.0|46.1|
|30.0| NaN|30.0|46.1|
| NaN|40.1|40.0|40.1|
|50.0|50.1|50.0|50.1|
|60.0|60.1|60.0|60.1|
|70.0|70.1|70.0|70.1|
+----+----+----+----+



In [None]:
#the above and below 2 step shows that the default strategy for calculating Imputation estimator is Mean.

In [13]:
# to find out what is the replacing values in each columns by using the "surrogateDF"
model.surrogateDF.show()

+----+----+
|   x|   y|
+----+----+
|40.0|46.1|
+----+----+



In [14]:
model1.surrogateDF.show()

+----+----+
|   x|   y|
+----+----+
|40.0|46.1|
+----+----+



In [15]:
# to find the replacing values in median
impute_median = imputer.setStrategy("median")
model2 = impute_median.fit(df)
model2.surrogateDF.show()

+----+----+
|   x|   y|
+----+----+
|30.0|50.1|
+----+----+

