# Tutorial 4 : Data Cleaning using PySpark

In [1]:
import warnings
warnings.filterwarnings('ignore')

### 1. Starting the spark session

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('data_cleaning_session').getOrCreate()

24/09/14 20:17:00 WARN Utils: Your hostname, Rishas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.45.141 instead (on interface en0)
24/09/14 20:17:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/14 20:17:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

### 2. Loading the dataset

In [4]:
df = spark.read.csv('demo_data.csv', header = True, inferSchema = True)

In [5]:
df.show()

+------+----+---------------------+------+
|Degree| Age|Experience (in years)|Salary|
+------+----+---------------------+------+
|    BE|  25|                    0|     0|
|    BE|  27|                    2| 15000|
|    ME|  34|                    7| 50000|
|   BBA|  26|                    1|  1000|
|  NULL|  24|                    1| 15000|
|   BSC|NULL|                    4| 20000|
|   MSC|  31|                    6| 45000|
| Mtech|NULL|                    3|  NULL|
|    BE|  28|                 NULL| 13500|
+------+----+---------------------+------+



### 3. Check for null values

In [6]:
from pyspark.sql.functions import col, when, count

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+---+---------------------+------+
|Degree|Age|Experience (in years)|Salary|
+------+---+---------------------+------+
|     1|  2|                    1|     1|
+------+---+---------------------+------+



In [7]:
df.printSchema()

root
 |-- Degree: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience (in years): integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [8]:
#creating a copy of main df
df1 = df

**Method 1 - Dropping all Null Values**|

In [9]:
df1 = df1.na.drop()

In [10]:
df1.select([count(when(col(c).isNull(), c)).alias(c) for c in df1.columns]).show()

+------+---+---------------------+------+
|Degree|Age|Experience (in years)|Salary|
+------+---+---------------------+------+
|     0|  0|                    0|     0|
+------+---+---------------------+------+



**Method 2 - Dropping Null Values from specific columns**

In [11]:
df2 = df

df2 = df2.na.drop(subset = ['Degree']) #Only the row with null value in Degree col will be dropped 

In [12]:
df2.select([count(when(col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

+------+---+---------------------+------+
|Degree|Age|Experience (in years)|Salary|
+------+---+---------------------+------+
|     0|  2|                    1|     1|
+------+---+---------------------+------+



**Method 3 - Substituting the values for NULL data**

**3.1 For string**

In [16]:
df3 = df

df3 = df3.fillna('Missing')  #df3 = df3.na.fill('Missing')

In [17]:
df3.show()

+-------+----+---------------------+------+
| Degree| Age|Experience (in years)|Salary|
+-------+----+---------------------+------+
|     BE|  25|                    0|     0|
|     BE|  27|                    2| 15000|
|     ME|  34|                    7| 50000|
|    BBA|  26|                    1|  1000|
|Missing|  24|                    1| 15000|
|    BSC|NULL|                    4| 20000|
|    MSC|  31|                    6| 45000|
|  Mtech|NULL|                    3|  NULL|
|     BE|  28|                 NULL| 13500|
+-------+----+---------------------+------+



**3.2 For integer - Filling all null values with constant value**

In [18]:
df3 = df3.na.fill(0)

In [19]:
df3.show()

+-------+---+---------------------+------+
| Degree|Age|Experience (in years)|Salary|
+-------+---+---------------------+------+
|     BE| 25|                    0|     0|
|     BE| 27|                    2| 15000|
|     ME| 34|                    7| 50000|
|    BBA| 26|                    1|  1000|
|Missing| 24|                    1| 15000|
|    BSC|  0|                    4| 20000|
|    MSC| 31|                    6| 45000|
|  Mtech|  0|                    3|     0|
|     BE| 28|                    0| 13500|
+-------+---+---------------------+------+



**3.3 For interger - Inputation (Filling the null value with statistic of that column)**

In [30]:
df4 = df

num_cols = [col_name for col_name, data_type in df4.dtypes if 'int' in data_type]

In [31]:
num_cols

['Age', 'Experience (in years)', 'Salary']

In [32]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = num_cols,
                 outputCols = [col_name + '_imp' for col_name in num_cols],
                 strategy = 'mean')

df4 = imputer.fit(df4).transform(df4)

In [33]:
df4.show()

+------+----+---------------------+------+-------+-------------------------+----------+
|Degree| Age|Experience (in years)|Salary|Age_imp|Experience (in years)_imp|Salary_imp|
+------+----+---------------------+------+-------+-------------------------+----------+
|    BE|  25|                    0|     0|     25|                        0|         0|
|    BE|  27|                    2| 15000|     27|                        2|     15000|
|    ME|  34|                    7| 50000|     34|                        7|     50000|
|   BBA|  26|                    1|  1000|     26|                        1|      1000|
|  NULL|  24|                    1| 15000|     24|                        1|     15000|
|   BSC|NULL|                    4| 20000|     27|                        4|     20000|
|   MSC|  31|                    6| 45000|     31|                        6|     45000|
| Mtech|NULL|                    3|  NULL|     27|                        3|     19937|
|    BE|  28|                 NU

In [36]:
df4.describe().show()

24/09/14 20:26:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------+------------------+---------------------+------------------+------------------+-------------------------+------------------+
|summary|Degree|               Age|Experience (in years)|            Salary|           Age_imp|Experience (in years)_imp|        Salary_imp|
+-------+------+------------------+---------------------+------------------+------------------+-------------------------+------------------+
|  count|     8|                 7|                    8|                 8|                 9|                        9|                 9|
|   mean|  NULL|27.857142857142858|                  3.0|           19937.5|27.666666666666668|                      3.0|19937.444444444445|
| stddev|  NULL| 3.532165125838609|   2.5071326821120348|18432.187491295917| 3.082207001484489|        2.345207879911715| 17241.73262110794|
|    min|   BBA|                24|                    0|                 0|                24|                        0|                 0|
|    max| Mte

In [37]:
#dropping the previous columns with null values
df4 = df4.drop(*num_cols)

In [38]:
df4.show()

+------+-------+-------------------------+----------+
|Degree|Age_imp|Experience (in years)_imp|Salary_imp|
+------+-------+-------------------------+----------+
|    BE|     25|                        0|         0|
|    BE|     27|                        2|     15000|
|    ME|     34|                        7|     50000|
|   BBA|     26|                        1|      1000|
|  NULL|     24|                        1|     15000|
|   BSC|     27|                        4|     20000|
|   MSC|     31|                        6|     45000|
| Mtech|     27|                        3|     19937|
|    BE|     28|                        3|     13500|
+------+-------+-------------------------+----------+

