## Pyspark handling missing values
- dropping columns
- dropping rows
- various parameter in dropping functionalities
- handling missing values by mean, median and mode

In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [2]:
df_pyspark=spark.read.csv('test3.csv', header=True, inferSchema=True)
df_pyspark

DataFrame[name : string, age: int, experience: int, salary: int]

In [3]:
df_pyspark.show()

+-----+----+----------+------+
|name | age|experience|salary|
+-----+----+----------+------+
|  ola|  25|         8| 10000|
|kasia|  44|        10|  2000|
| kuba|  55|         7|  4000|
| bolo|  11|         8|  6000|
|asdad|  22|      null| 33333|
| gggg|  44|      null|  null|
|   tt|null|      null|  null|
|ddddd|  33|      null|  null|
|  rrr|  22|        11| 33546|
| null|  44|      null|  null|
+-----+----+----------+------+



In [4]:
## drop the columns
df_pyspark.drop('name ').show()


+----+----------+------+
| age|experience|salary|
+----+----------+------+
|  25|         8| 10000|
|  44|        10|  2000|
|  55|         7|  4000|
|  11|         8|  6000|
|  22|      null| 33333|
|  44|      null|  null|
|null|      null|  null|
|  33|      null|  null|
|  22|        11| 33546|
|  44|      null|  null|
+----+----------+------+



In [5]:
df_pyspark.show()

+-----+----+----------+------+
|name | age|experience|salary|
+-----+----+----------+------+
|  ola|  25|         8| 10000|
|kasia|  44|        10|  2000|
| kuba|  55|         7|  4000|
| bolo|  11|         8|  6000|
|asdad|  22|      null| 33333|
| gggg|  44|      null|  null|
|   tt|null|      null|  null|
|ddddd|  33|      null|  null|
|  rrr|  22|        11| 33546|
| null|  44|      null|  null|
+-----+----+----------+------+



In [6]:
## drop rows with nulls
df_pyspark.na.drop().show()

+-----+---+----------+------+
|name |age|experience|salary|
+-----+---+----------+------+
|  ola| 25|         8| 10000|
|kasia| 44|        10|  2000|
| kuba| 55|         7|  4000|
| bolo| 11|         8|  6000|
|  rrr| 22|        11| 33546|
+-----+---+----------+------+



In [10]:
### any == how
df_pyspark.na.drop(how='any').show()

+-----+---+----------+------+
|name |age|experience|salary|
+-----+---+----------+------+
|  ola| 25|         8| 10000|
|kasia| 44|        10|  2000|
| kuba| 55|         7|  4000|
| bolo| 11|         8|  6000|
|  rrr| 22|        11| 33546|
+-----+---+----------+------+



In [12]:
## threshold - keep rows with minimum thresh=num non nulls
df_pyspark.na.drop(how='any',thresh=3).show()

+-----+---+----------+------+
|name |age|experience|salary|
+-----+---+----------+------+
|  ola| 25|         8| 10000|
|kasia| 44|        10|  2000|
| kuba| 55|         7|  4000|
| bolo| 11|         8|  6000|
|asdad| 22|      null| 33333|
|  rrr| 22|        11| 33546|
+-----+---+----------+------+



In [13]:
## Subset - drop rows with null values in subset column
df_pyspark.na.drop(how='any',subset=['experience']).show()

+-----+---+----------+------+
|name |age|experience|salary|
+-----+---+----------+------+
|  ola| 25|         8| 10000|
|kasia| 44|        10|  2000|
| kuba| 55|         7|  4000|
| bolo| 11|         8|  6000|
|  rrr| 22|        11| 33546|
+-----+---+----------+------+



In [22]:
### Filling the missing values
df_pyspark.na.fill('missing').show()

+--------+----+----------+------+
|   name | age|experience|salary|
+--------+----+----------+------+
|     ola|  25|         8| 10000|
|   kasia|  44|        10|  2000|
|    kuba|  55|         7|  4000|
|    bolo|  11|         8|  6000|
|   asdad|  22|      null| 33333|
|    gggg|  44|      null|  null|
|      tt|null|      null|  null|
|   ddddd|  33|      null|  null|
|     rrr|  22|        11| 33546|
|missing |  44|      null|  null|
+--------+----+----------+------+



In [31]:
df_pyspark

DataFrame[name : string, age: int, experience: int, salary: int]

In [86]:
# zamiana column int na float aby przeprowadzic funcje imputera 
from pyspark.sql.types import StringType
df_pyspark2 = df_pyspark
df_pyspark2 = df_pyspark2.withColumn("age",df_pyspark["age"].cast(FloatType()))
df_pyspark2 = df_pyspark2.withColumn("salary",df_pyspark["salary"].cast(FloatType()))
df_pyspark2 = df_pyspark2.withColumn("experience",df_pyspark["experience"].cast(FloatType()))
# df_pyspark2 = df_pyspark.withColumn("age",df_pyspark.age.cast('float'))
# df_pyspark2 = df_pyspark.withColumn("experience",df_pyspark.experience.cast('float'))
# df_pyspark2 = df_pyspark.withColumn("salary",df_pyspark.salary.cast('float'))

df_pyspark2

DataFrame[name : string, age: float, experience: float, salary: float]

In [79]:
## filling missing data with mean/median/etc.. values wtih imputer function from scikitlearn
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age','experience','salary'],
    outputCols=["{}_imputed".format(c) for c in ['age','experience','salary']]
    ).setStrategy("mean")

In [87]:
# Add imputation cols to df
imputer.fit(df_pyspark2).transform(df_pyspark2).show()

+-----+----+----------+-------+-----------+------------------+--------------+
|name | age|experience| salary|age_imputed|experience_imputed|salary_imputed|
+-----+----+----------+-------+-----------+------------------+--------------+
|  ola|25.0|       8.0|10000.0|       25.0|               8.0|       10000.0|
|kasia|44.0|      10.0| 2000.0|       44.0|              10.0|        2000.0|
| kuba|55.0|       7.0| 4000.0|       55.0|               7.0|        4000.0|
| bolo|11.0|       8.0| 6000.0|       11.0|               8.0|        6000.0|
|asdad|22.0|      null|33333.0|       22.0|               8.8|       33333.0|
| gggg|44.0|      null|   null|       44.0|               8.8|     14813.167|
|   tt|null|      null|   null|  33.333332|               8.8|     14813.167|
|ddddd|33.0|      null|   null|       33.0|               8.8|     14813.167|
|  rrr|22.0|      11.0|33546.0|       22.0|              11.0|       33546.0|
| null|44.0|      null|   null|       44.0|               8.8|  