# Lesson Three : Clean Data with Pyspark
### By Samuel Ko

In [17]:
# - The aim of this lesson is to explore the Data Transformation
# - Learn how to drop table rows and columns
# - Various Parameter in dropping functionalities
# - Handling missing values by Mean

In [18]:
import pyspark

In [19]:
from pyspark.sql import SparkSession

In [20]:
# Create a spark session (spark is a variable)
# for operation
spark=SparkSession.builder.appName('dataTransformation').getOrCreate()

In [21]:
spark

In [22]:
# Read the dataframe
df_spark=spark.read.csv('test3.csv', header=True, inferSchema=True)

In [23]:
# Discover the row and column which contains null values
df_spark.show()

+--------------+----+-------------------+----------+------+
|          Name| Age|      Qualification|Experience|Salary|
+--------------+----+-------------------+----------+------+
|Kennith Kawaki|null|            Diploma|      null|  null|
|   Sudans Wong|  31|         Apprentice|         6| 40000|
|   Polly Combo|  27|            Diploma|         4| 35000|
|Desmond Cheung|  51|University Graduate|        22| 88000|
|     Chariotte|  35|            Diploma|         8| 61000|
|        Sophia|null|         Apprentice|         3| 30000|
|          null|  25|            Diploma|         3| 28000|
|  George Woody|  31|               null|         7| 47000|
|    Pansy Rose|  33|         Apprentice|      null|  null|
|          John|  17|         Apprentice|         1| 24300|
|         Peter|  45|University Graduate|        18| 81000|
|         David|  42|University Graduate|        15| 76000|
|        Usamha|  28|            Diploma|         4| 33000|
|    Zain Amjun|  34|         Apprentice

In [24]:
# Drop a specified column 'Experience'
df_spark.drop('Experience').show()

+--------------+----+-------------------+------+
|          Name| Age|      Qualification|Salary|
+--------------+----+-------------------+------+
|Kennith Kawaki|null|            Diploma|  null|
|   Sudans Wong|  31|         Apprentice| 40000|
|   Polly Combo|  27|            Diploma| 35000|
|Desmond Cheung|  51|University Graduate| 88000|
|     Chariotte|  35|            Diploma| 61000|
|        Sophia|null|         Apprentice| 30000|
|          null|  25|            Diploma| 28000|
|  George Woody|  31|               null| 47000|
|    Pansy Rose|  33|         Apprentice|  null|
|          John|  17|         Apprentice| 24300|
|         Peter|  45|University Graduate| 81000|
|         David|  42|University Graduate| 76000|
|        Usamha|  28|            Diploma| 33000|
|    Zain Amjun|  34|         Apprentice| 55000|
|  May Mackency|  29|            Diploma| 39000|
| Edwards Colin|  26|         Apprentice| 31000|
|  Megan Guydry|null|University Graduate| 72000|
|Raymon Almeida|  25

In [25]:
# drop the row contain NULL value
# df_spark.na.drop(how='any', thresh=3).show().
# Drop rows wiht non-null value less than thresh=3
# i.e. Two non-null value in the first record 
# which Name is 'Kennith Kawaki' will be dropped
df_spark.dropna(how='any', thresh=3).show()

+--------------+----+-------------------+----------+------+
|          Name| Age|      Qualification|Experience|Salary|
+--------------+----+-------------------+----------+------+
|   Sudans Wong|  31|         Apprentice|         6| 40000|
|   Polly Combo|  27|            Diploma|         4| 35000|
|Desmond Cheung|  51|University Graduate|        22| 88000|
|     Chariotte|  35|            Diploma|         8| 61000|
|        Sophia|null|         Apprentice|         3| 30000|
|          null|  25|            Diploma|         3| 28000|
|  George Woody|  31|               null|         7| 47000|
|    Pansy Rose|  33|         Apprentice|      null|  null|
|          John|  17|         Apprentice|         1| 24300|
|         Peter|  45|University Graduate|        18| 81000|
|         David|  42|University Graduate|        15| 76000|
|        Usamha|  28|            Diploma|         4| 33000|
|    Zain Amjun|  34|         Apprentice|      null| 55000|
|  May Mackency|  29|            Diploma

In [26]:
# drop the row contain NULL value
# df_spark.na.drop(how='any', thresh=3).show()
df_spark.na.drop(how='any').show()  # result of df_spark.dropna(how='any').show() is the same

+--------------+---+-------------------+----------+------+
|          Name|Age|      Qualification|Experience|Salary|
+--------------+---+-------------------+----------+------+
|   Sudans Wong| 31|         Apprentice|         6| 40000|
|   Polly Combo| 27|            Diploma|         4| 35000|
|Desmond Cheung| 51|University Graduate|        22| 88000|
|     Chariotte| 35|            Diploma|         8| 61000|
|          John| 17|         Apprentice|         1| 24300|
|         Peter| 45|University Graduate|        18| 81000|
|         David| 42|University Graduate|        15| 76000|
|        Usamha| 28|            Diploma|         4| 33000|
|  May Mackency| 29|            Diploma|         5| 39000|
| Edwards Colin| 26|         Apprentice|         3| 31000|
|Raymon Almeida| 25|            Diploma|         2| 26500|
+--------------+---+-------------------+----------+------+



In [27]:
# drop the Subset row contain NULL value
df_spark.na.drop(how='any', subset=['Name']).show()

+--------------+----+-------------------+----------+------+
|          Name| Age|      Qualification|Experience|Salary|
+--------------+----+-------------------+----------+------+
|Kennith Kawaki|null|            Diploma|      null|  null|
|   Sudans Wong|  31|         Apprentice|         6| 40000|
|   Polly Combo|  27|            Diploma|         4| 35000|
|Desmond Cheung|  51|University Graduate|        22| 88000|
|     Chariotte|  35|            Diploma|         8| 61000|
|        Sophia|null|         Apprentice|         3| 30000|
|  George Woody|  31|               null|         7| 47000|
|    Pansy Rose|  33|         Apprentice|      null|  null|
|          John|  17|         Apprentice|         1| 24300|
|         Peter|  45|University Graduate|        18| 81000|
|         David|  42|University Graduate|        15| 76000|
|        Usamha|  28|            Diploma|         4| 33000|
|    Zain Amjun|  34|         Apprentice|      null| 55000|
|  May Mackency|  29|            Diploma

In [28]:
# Read the dataframe
df_spark=spark.read.csv('test3.csv', header=True, inferSchema=True)

## Fill the missing value
## Because the read csv file applies inferSchema=True
## Thus all the string field contains null value will be filled with 'Missing One'
df_spark.na.fill('Missing One').show()

+--------------+----+-------------------+----------+------+
|          Name| Age|      Qualification|Experience|Salary|
+--------------+----+-------------------+----------+------+
|Kennith Kawaki|null|            Diploma|      null|  null|
|   Sudans Wong|  31|         Apprentice|         6| 40000|
|   Polly Combo|  27|            Diploma|         4| 35000|
|Desmond Cheung|  51|University Graduate|        22| 88000|
|     Chariotte|  35|            Diploma|         8| 61000|
|        Sophia|null|         Apprentice|         3| 30000|
|   Missing One|  25|            Diploma|         3| 28000|
|  George Woody|  31|        Missing One|         7| 47000|
|    Pansy Rose|  33|         Apprentice|      null|  null|
|          John|  17|         Apprentice|         1| 24300|
|         Peter|  45|University Graduate|        18| 81000|
|         David|  42|University Graduate|        15| 76000|
|        Usamha|  28|            Diploma|         4| 33000|
|    Zain Amjun|  34|         Apprentice

In [29]:
# Read the dataframe
df_spark=spark.read.csv('test3.csv', header=True, inferSchema=False)

## Fill the missing value
df_spark.na.fill('Missing One', ['Age', 'Experience']).show()

+--------------+-----------+-------------------+-----------+------+
|          Name|        Age|      Qualification| Experience|Salary|
+--------------+-----------+-------------------+-----------+------+
|Kennith Kawaki|Missing One|            Diploma|Missing One|  null|
|   Sudans Wong|         31|         Apprentice|          6| 40000|
|   Polly Combo|         27|            Diploma|          4| 35000|
|Desmond Cheung|         51|University Graduate|         22| 88000|
|     Chariotte|         35|            Diploma|          8| 61000|
|        Sophia|Missing One|         Apprentice|          3| 30000|
|          null|         25|            Diploma|          3| 28000|
|  George Woody|         31|               null|          7| 47000|
|    Pansy Rose|         33|         Apprentice|Missing One|  null|
|          John|         17|         Apprentice|          1| 24300|
|         Peter|         45|University Graduate|         18| 81000|
|         David|         42|University Graduate|

In [30]:
df_spark.show()

from pyspark.ml.feature import Imputer

# Read the dataframe
df_spark=spark.read.csv('test3.csv', header=True, inferSchema=True)


# Since we cannot take mean on 'Name' field, so just 'Age', 'Experience' and 'Salary' can be
# Applied imputer...
imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
    ).setStrategy("mean")

+--------------+----+-------------------+----------+------+
|          Name| Age|      Qualification|Experience|Salary|
+--------------+----+-------------------+----------+------+
|Kennith Kawaki|null|            Diploma|      null|  null|
|   Sudans Wong|  31|         Apprentice|         6| 40000|
|   Polly Combo|  27|            Diploma|         4| 35000|
|Desmond Cheung|  51|University Graduate|        22| 88000|
|     Chariotte|  35|            Diploma|         8| 61000|
|        Sophia|null|         Apprentice|         3| 30000|
|          null|  25|            Diploma|         3| 28000|
|  George Woody|  31|               null|         7| 47000|
|    Pansy Rose|  33|         Apprentice|      null|  null|
|          John|  17|         Apprentice|         1| 24300|
|         Peter|  45|University Graduate|        18| 81000|
|         David|  42|University Graduate|        15| 76000|
|        Usamha|  28|            Diploma|         4| 33000|
|    Zain Amjun|  34|         Apprentice

In [31]:
# Add imputation columns (插補列) to df
imputer.fit(df_spark).transform(df_spark).show()

+--------------+----+-------------------+----------+------+-----------+------------------+--------------+
|          Name| Age|      Qualification|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------------+----+-------------------+----------+------+-----------+------------------+--------------+
|Kennith Kawaki|null|            Diploma|      null|  null|         31|                 7|         47925|
|   Sudans Wong|  31|         Apprentice|         6| 40000|         31|                 6|         40000|
|   Polly Combo|  27|            Diploma|         4| 35000|         27|                 4|         35000|
|Desmond Cheung|  51|University Graduate|        22| 88000|         51|                22|         88000|
|     Chariotte|  35|            Diploma|         8| 61000|         35|                 8|         61000|
|        Sophia|null|         Apprentice|         3| 30000|         31|                 3|         30000|
|          null|  25|            Diploma|     