### Imports

In [159]:
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import Imputer

import pyspark.sql.functions as F

spark = SparkSession\
                    .builder\
                    .master('local')\
                    .appName("pyspark local testing")\
                    .getOrCreate()

df = spark.read.csv("../datasets/ContaminatedBoston.csv", inferSchema=True, header=True)

# Only a couple missing values initially.
# Easier to compare imputation methods with them gone
df = df.dropna()
df.show()

AnalysisException: 'Path does not exist: file:/Users/kuba/development/datasets/ContaminatedBoston.csv;'

In [3]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- black: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



### Create missing data


This was only used for initially creating the MCAR data

In [9]:
nan = float('nan')

def make_5_percent_null(x):
    if random.uniform(0, 1) < 0.05:
        return nan
    return x

def make_10_percent_null(x):
    if random.uniform(0, 1) < 0.1:
        return nan
    return x

def make_15_percent_null(x):
    if random.uniform(0, 1) < 0.15:
        return nan
    return x

IndentationError: expected an indented block (<ipython-input-9-a8a4a96667c0>, line 3)

In [135]:
df_miss = spark.read.csv("boston_housing_mcar.csv", inferSchema=True, header=True)\
               .replace(float("nan"), None, subset=["zn_miss", "crim_miss", "indus_miss"])

df_miss.show()
#
# used for creating the missing data
#
# null_5pct = F.udf(make_5_percent_null, FloatType())
# null_10pct = F.udf(make_10_percent_null, FloatType())
# null_15pct = F.udf(make_15_percent_null, FloatType())

# df_miss = df

# df_miss = df_miss.withColumn("crim_miss", null_5pct("crim"))\
#                  .withColumn("zn_miss", null_10pct("zn"))\
#                  .withColumn("indus_miss", null_15pct("indus"))

# df_miss.show()

# df_miss.write.format("csv").option("header", "true").save("boston_housing_mcar.csv")

+---+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------+-------+----------+
|_c0|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio| black|lstat|medv|crim_miss|zn_miss|indus_miss|
+---+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------+-------+----------+
|  5|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|  0.06905|    0.0|      null|
|  6|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|  0.02985|    0.0|      2.18|
|  7|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|  0.08829|   12.5|      7.87|
|  8|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|  0.14455|   12.5|      7.87|
|  9|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|311|   15.2|386.63|29.93|16.5|  0.21124|   12.5|      null|
| 10|0.17004|12.5| 7.87|   0|0.524|6.004| 85.9|6.5921|  

In [137]:
# counts of missing values per column

df_miss.select(
    [F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)\
     for c in df_miss.columns])\
.show()

+---+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+----+---------+-------+----------+
|_c0|crim| zn|indus|chas|nox| rm|age|dis|rad|tax|ptratio|black|lstat|medv|crim_miss|zn_miss|indus_miss|
+---+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+----+---------+-------+----------+
|  0|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|      0|    0|    0|   0|       26|     56|        91|
+---+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+----+---------+-------+----------+



In [138]:
# Missing data columns

crim_miss, zn_miss, indus_miss = "crim_miss", "zn_miss", "indus_miss"

# Linear Regression Imputation

### No initial imputation

In [139]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["crim", "indus", "chas", "nox"], outputCol="features")
output = assembler.transform(df_miss)

+--------------------+----+
|            features|  zn|
+--------------------+----+
|[0.06905,2.18,0.0...| 0.0|
|[0.02985,2.18,0.0...| 0.0|
|[0.08829,7.87,0.0...|12.5|
|[0.14455,7.87,0.0...|12.5|
|[0.21124,7.87,0.0...|12.5|
+--------------------+----+
only showing top 5 rows



In [140]:
final_data = output.select('features', 'zn')
final_data.show()

+--------------------+----+
|            features|  zn|
+--------------------+----+
|[0.06905,2.18,0.0...| 0.0|
|[0.02985,2.18,0.0...| 0.0|
|[0.08829,7.87,0.0...|12.5|
|[0.14455,7.87,0.0...|12.5|
|[0.21124,7.87,0.0...|12.5|
|[0.17004,7.87,0.0...|12.5|
|[0.22489,7.87,0.0...|12.5|
|[0.11747,7.87,0.0...|12.5|
|[0.09378,7.87,0.0...|12.5|
|[0.62976,8.14,0.0...| 0.0|
|[0.63796,8.14,0.0...| 0.0|
|[0.62739,8.14,0.0...| 0.0|
|[1.05393,8.14,0.0...| 0.0|
|[0.7842,8.14,0.0,...| 0.0|
|[0.80271,8.14,0.0...| 0.0|
|[0.7258,8.14,0.0,...| 0.0|
|[1.25179,8.14,0.0...| 0.0|
|[0.85204,8.14,0.0...| 0.0|
|[1.23247,8.14,0.0...| 0.0|
|[0.98843,8.14,0.0...| 0.0|
+--------------------+----+
only showing top 20 rows



In [141]:
from pyspark.ml.regression import LinearRegression

lm = LinearRegression(featuresCol="features", labelCol="zn")

trained_model = lm.fit(final_data)

model_results = trained_model.evaluate(final_data)
no_impute_coefs = trained_model.coefficients
no_impute_coefs

DenseVector([0.1542, -1.1923, 0.571, -55.9202])

In [143]:
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.sql.functions import when, col, lit, udf

In [144]:
def vec_col(x):
    return udf(lambda: x, VectorUDT())()

def dot_prod(vec1, vec2):
    # NEEDS to be wrapped in float(). py4j can't translate numpy.float64 --> float in Java.
    return float(vec1.dot(vec2))

udf_dot_prod = udf(lambda vec1, vec2: dot_prod(vec1, vec2), FloatType())

In [146]:
# output = output.withColumn("zn_miss", col("zn_miss").cast(FloatType()))


output = output\
                .withColumn("coefs", vec_col(no_impute_coefs))\
                .withColumn("zn_miss",
                            when(col("zn_miss").isNull(), udf_dot_prod(col("coefs"), col("features")))
                            .otherwise(col("zn_miss")))


+---+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------+------------------+----------+--------------------+--------------------+
|_c0|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio| black|lstat|medv|crim_miss|           zn_miss|indus_miss|            features|               coefs|
+---+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------+------------------+----------+--------------------+--------------------+
|  5|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|  0.06905|               0.0|      null|[0.06905,2.18,0.0...|[0.15419254703355...|
|  6|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|  0.02985|               0.0|      2.18|[0.02985,2.18,0.0...|[0.15419254703355...|
|  7|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|  0.08829|              12.5|      7.87|[0.08829,7.87,0.0

### Imputation Assessment

**TODO:** more research into what metrics to include for imputation assessment

https://stefvanbuuren.name/fimd/sec-evaluation.html

https://www.frontiersin.org/articles/10.3389/fgene.2019.00034/full

https://spark.apache.org/docs/2.3.0/api/python/pyspark.ml.html#module-pyspark.ml.evaluation


In [151]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="zn_miss", labelCol="zn")

In [152]:
# RMSE
evaluator.evaluate(output) 

19.782330299741574

### With initial imputation

In [153]:
imputer = Imputer(strategy="mean",
                  inputCols=["crim_miss", "indus_miss", "zn_miss"],
                  outputCols=["crim_miss_imp", "indus_miss_imp", "zn_miss_imp"])

df_miss_imp = imputer.fit(df_miss).transform(df_miss)
df_miss_imp.show()

+---+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------+-------+----------+-------------+------------------+-----------------+
|_c0|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio| black|lstat|medv|crim_miss|zn_miss|indus_miss|crim_miss_imp|    indus_miss_imp|      zn_miss_imp|
+---+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+---------+-------+----------+-------------+------------------+-----------------+
|  5|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|  0.06905|    0.0|      null|      0.06905|10.781411192214094|              0.0|
|  6|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|  0.02985|    0.0|      2.18|      0.02985|              2.18|              0.0|
|  7|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|  0.08829|   12.5|      7.87|      0.08829|              7.87|     

In [154]:
assembler = VectorAssembler(inputCols=["crim_miss_imp", "indus_miss_imp", "chas", "nox"], outputCol="features")
output = assembler.transform(df_miss_imp)

In [155]:
final_data = output.select('features', 'zn_miss_imp')

In [156]:
lm = LinearRegression(featuresCol="features", labelCol="zn_miss_imp")

trained_model = lm.fit(final_data)

model_results = trained_model.evaluate(final_data)
impute_coefs = trained_model.coefficients
impute_coefs

DenseVector([0.1195, -0.9986, 0.0404, -58.228])

In [157]:
output = output\
                .withColumn("coefs", vec_col(impute_coefs))\
                .withColumn("zn_miss",
                            when(col("zn_miss").isNull(), udf_dot_prod(col("coefs"), col("features")))
                            .otherwise(col("zn_miss")))


In [158]:
evaluator = RegressionEvaluator(predictionCol="zn_miss", labelCol="zn")

# RMSE
evaluator.evaluate(output) 

19.50352712741658

## Initial Imputation vs No Initial Imputation

Without initial imputation on `zn_miss`, the RMSE is: 19.782330299741574

With initial imputation on `zn_miss`, the RMSE is: 19.50352712741658

Which in this unscientific example, is a ~1.41% improvement.

# Next Steps

1. Build out the Linear Regression Imputer class
    1. Create tests
        
2. Look into imputation diagnostics and inference of models in the context of imputation
    1. Create general function / class for basic summary statistics for columns before/after imputation.