# Handle missing values

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder.appName("Practise").getOrCreate()

In [22]:
df = spark.read.csv("base_salary.csv", header=True,inferSchema=True)

In [23]:
df.show()

+--------+----+----------+------+
|    name| age|experience|salary|
+--------+----+----------+------+
|   Roger|24.0|         2| 24000|
|   Marie|40.0|        10| 40000|
|  Fatima|47.0|         7| 80000|
|    Alex|26.0|         3| 38000|
|   sunny|24.0|         2|  NULL|
|    NULL|25.0|      NULL|  NULL|
|  Robert|50.0|      NULL|  NULL|
|    NULL|48.0|      NULL| 60000|
|Amandine|NULL|         5|  NULL|
|    NULL|NULL|      NULL|  NULL|
|  Armine|NULL|      NULL|  NULL|
+--------+----+----------+------+



In [24]:
df.describe().show()

+-------+-----+------------------+------------------+------------------+
|summary| name|               age|        experience|            salary|
+-------+-----+------------------+------------------+------------------+
|  count|    8|                 8|                 6|                 5|
|   mean| NULL|              35.5| 4.833333333333333|           48400.0|
| stddev| NULL|11.856282240712245|3.1885210782848317|21835.750502329887|
|    min| Alex|              24.0|                 2|             24000|
|    max|sunny|              50.0|                10|             80000|
+-------+-----+------------------+------------------+------------------+



## Drop ine with null values

In [25]:
df.na.drop().show()

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
| Roger|24.0|         2| 24000|
| Marie|40.0|        10| 40000|
|Fatima|47.0|         7| 80000|
|  Alex|26.0|         3| 38000|
+------+----+----------+------+



In [26]:
df.na.drop(how="any").show() # "Any" by default

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
| Roger|24.0|         2| 24000|
| Marie|40.0|        10| 40000|
|Fatima|47.0|         7| 80000|
|  Alex|26.0|         3| 38000|
+------+----+----------+------+



In [27]:
df.na.drop(how="all").show() #Any by default

+--------+----+----------+------+
|    name| age|experience|salary|
+--------+----+----------+------+
|   Roger|24.0|         2| 24000|
|   Marie|40.0|        10| 40000|
|  Fatima|47.0|         7| 80000|
|    Alex|26.0|         3| 38000|
|   sunny|24.0|         2|  NULL|
|    NULL|25.0|      NULL|  NULL|
|  Robert|50.0|      NULL|  NULL|
|    NULL|48.0|      NULL| 60000|
|Amandine|NULL|         5|  NULL|
|  Armine|NULL|      NULL|  NULL|
+--------+----+----------+------+



In [28]:
df.na.drop(thresh=2).show() # At least 2 NONE values

+--------+----+----------+------+
|    name| age|experience|salary|
+--------+----+----------+------+
|   Roger|24.0|         2| 24000|
|   Marie|40.0|        10| 40000|
|  Fatima|47.0|         7| 80000|
|    Alex|26.0|         3| 38000|
|   sunny|24.0|         2|  NULL|
|  Robert|50.0|      NULL|  NULL|
|    NULL|48.0|      NULL| 60000|
|Amandine|NULL|         5|  NULL|
+--------+----+----------+------+



## Subset in drop function

In [29]:
df.na.drop(how="any",subset = ['Experience','salary']).show() 
# Subset drop null value in na.drop on a specific column(s)

+------+----+----------+------+
|  name| age|experience|salary|
+------+----+----------+------+
| Roger|24.0|         2| 24000|
| Marie|40.0|        10| 40000|
|Fatima|47.0|         7| 80000|
|  Alex|26.0|         3| 38000|
+------+----+----------+------+



## Selecting columns and indexing

In [30]:
df.columns

['name', 'age', 'experience', 'salary']

In [31]:
df.head(4)

[Row(name='Roger', age=24.0, experience=2, salary=24000),
 Row(name='Marie', age=40.0, experience=10, salary=40000),
 Row(name='Fatima', age=47.0, experience=7, salary=80000),
 Row(name='Alex', age=26.0, experience=3, salary=38000)]

In [32]:
type(df)

pyspark.sql.dataframe.DataFrame

In [33]:
df.select("name","salary").show()

+--------+------+
|    name|salary|
+--------+------+
|   Roger| 24000|
|   Marie| 40000|
|  Fatima| 80000|
|    Alex| 38000|
|   sunny|  NULL|
|    NULL|  NULL|
|  Robert|  NULL|
|    NULL| 60000|
|Amandine|  NULL|
|    NULL|  NULL|
|  Armine|  NULL|
+--------+------+



## adding columns

In [34]:
df = df.withColumn("month_salary",df['salary']/12)

In [35]:
df.show()

+--------+----+----------+------+------------------+
|    name| age|experience|salary|      month_salary|
+--------+----+----------+------+------------------+
|   Roger|24.0|         2| 24000|            2000.0|
|   Marie|40.0|        10| 40000|3333.3333333333335|
|  Fatima|47.0|         7| 80000| 6666.666666666667|
|    Alex|26.0|         3| 38000|3166.6666666666665|
|   sunny|24.0|         2|  NULL|              NULL|
|    NULL|25.0|      NULL|  NULL|              NULL|
|  Robert|50.0|      NULL|  NULL|              NULL|
|    NULL|48.0|      NULL| 60000|            5000.0|
|Amandine|NULL|         5|  NULL|              NULL|
|    NULL|NULL|      NULL|  NULL|              NULL|
|  Armine|NULL|      NULL|  NULL|              NULL|
+--------+----+----------+------+------------------+



## Filling the missing values

In [43]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- month_salary: double (nullable = true)



In [44]:
df.na.fill(41,['Experience','age']).show()
# Specifie the value to replace and then the column(s)

+--------+----+----------+------+------------------+
|    name| age|experience|salary|      month_salary|
+--------+----+----------+------+------------------+
|   Roger|24.0|         2| 24000|            2000.0|
|   Marie|40.0|        10| 40000|3333.3333333333335|
|  Fatima|47.0|         7| 80000| 6666.666666666667|
|    Alex|26.0|         3| 38000|3166.6666666666665|
|   sunny|24.0|         2|  NULL|              NULL|
|    NULL|25.0|        41|  NULL|              NULL|
|  Robert|50.0|        41|  NULL|              NULL|
|    NULL|48.0|        41| 60000|            5000.0|
|Amandine|41.0|         5|  NULL|              NULL|
|    NULL|41.0|        41|  NULL|              NULL|
|  Armine|41.0|        41|  NULL|              NULL|
+--------+----+----------+------+------------------+



# Inputer function (case sensitive !!)

### Imputer for mean

In [70]:
from pyspark.ml.feature import Imputer
## Input are case sensitive !!!!

imputer_mean = Imputer(
    inputCols = ['age','experience','salary'],
    outputCols = ["{}_imputed".format(c) for c in ['age','experience','salary']]
).setStrategy("mean")


In [72]:
imputer_mean.fit(df).transform(df).show()

+--------+----+----------+------+------------------+-----------+------------------+--------------+
|    name| age|experience|salary|      month_salary|age_imputed|experience_imputed|salary_imputed|
+--------+----+----------+------+------------------+-----------+------------------+--------------+
|   Roger|24.0|         2| 24000|            2000.0|       24.0|                 2|         24000|
|   Marie|40.0|        10| 40000|3333.3333333333335|       40.0|                10|         40000|
|  Fatima|47.0|         7| 80000| 6666.666666666667|       47.0|                 7|         80000|
|    Alex|26.0|         3| 38000|3166.6666666666665|       26.0|                 3|         38000|
|   sunny|24.0|         2|  NULL|              NULL|       24.0|                 2|         48400|
|    NULL|25.0|      NULL|  NULL|              NULL|       25.0|                 4|         48400|
|  Robert|50.0|      NULL|  NULL|              NULL|       50.0|                 4|         48400|
|    NULL|

### Imputer for median

In [55]:
from pyspark.ml.feature import Imputer

imputer_median = Imputer(
    inputCols = ['age','Experience','salary'],
    outputCols= ["{}_median".format(c) for c in ['age','Experience','salary']]
).setStrategy("median")