In [25]:
from pyspark.sql import SparkSession

In [26]:
spark = SparkSession.builder.appName("HandelMissingValues").getOrCreate()

In [27]:
spark

In [28]:
data = spark.read.csv('data/missing_data.csv', header=True, inferSchema=True)
data.show()

+---+-------------+--------------------+----+
| id|     fullname|               email| age|
+---+-------------+--------------------+----+
|  1|     John Doe|  john.doe@email.com|  32|
|  2|   Jane Smith|                NULL|  44|
|  3| Mark Johnson|mark.johnson@emai...|NULL|
|  4|         NULL|emily.brown@email...|  76|
|  5|Michael Davis|                NULL|   7|
|  6| Sarah Wilson|sarah.wilson@emai...|NULL|
|  7|    Kevin Lee|                NULL|NULL|
|  8|         NULL|amanda.taylor@ema...|  60|
|  9| Brian Miller|brian.miller@emai...|  43|
| 10|         NULL|jessica.garcia@em...|NULL|
| 11|Eric Robinson|                NULL|  54|
| 12|         NULL|rachel.martinez@e...|  22|
| 13|  Ryan Carter|ryan.carter@email...|NULL|
| 14|         NULL|lisa.turner@email...|  25|
| 15|   Alex Brown|                NULL|  35|
+---+-------------+--------------------+----+



In [29]:
# Drop Rows
# In this case whenever the null value is exist it will dropped, even if 
# the other columns contain data, it will be dropped.

data.na.drop().show()

+---+------------+--------------------+---+
| id|    fullname|               email|age|
+---+------------+--------------------+---+
|  1|    John Doe|  john.doe@email.com| 32|
|  9|Brian Miller|brian.miller@emai...| 43|
+---+------------+--------------------+---+



In [30]:
### data.na.drop().show()
# This sets a threshold, meaning a row will be dropped only if it has fewer than 3 non-null values.
data.na.drop(how="any", thresh=3).show()

+---+-------------+--------------------+----+
| id|     fullname|               email| age|
+---+-------------+--------------------+----+
|  1|     John Doe|  john.doe@email.com|  32|
|  2|   Jane Smith|                NULL|  44|
|  3| Mark Johnson|mark.johnson@emai...|NULL|
|  4|         NULL|emily.brown@email...|  76|
|  5|Michael Davis|                NULL|   7|
|  6| Sarah Wilson|sarah.wilson@emai...|NULL|
|  8|         NULL|amanda.taylor@ema...|  60|
|  9| Brian Miller|brian.miller@emai...|  43|
| 11|Eric Robinson|                NULL|  54|
| 12|         NULL|rachel.martinez@e...|  22|
| 13|  Ryan Carter|ryan.carter@email...|NULL|
| 14|         NULL|lisa.turner@email...|  25|
| 15|   Alex Brown|                NULL|  35|
+---+-------------+--------------------+----+



In [31]:
# This will drop missing values only on specific column mentioned in subset
data.na.drop(how="any", subset=['email']).show()

+---+------------+--------------------+----+
| id|    fullname|               email| age|
+---+------------+--------------------+----+
|  1|    John Doe|  john.doe@email.com|  32|
|  3|Mark Johnson|mark.johnson@emai...|NULL|
|  4|        NULL|emily.brown@email...|  76|
|  6|Sarah Wilson|sarah.wilson@emai...|NULL|
|  8|        NULL|amanda.taylor@ema...|  60|
|  9|Brian Miller|brian.miller@emai...|  43|
| 10|        NULL|jessica.garcia@em...|NULL|
| 12|        NULL|rachel.martinez@e...|  22|
| 13| Ryan Carter|ryan.carter@email...|NULL|
| 14|        NULL|lisa.turner@email...|  25|
+---+------------+--------------------+----+



### Filling The missing values

In [32]:
### This will fill the missing values with specific string provided in value parameter

data.na.fill(value="Hello missing", subset=['email']).show()


+---+-------------+--------------------+----+
| id|     fullname|               email| age|
+---+-------------+--------------------+----+
|  1|     John Doe|  john.doe@email.com|  32|
|  2|   Jane Smith|       Hello missing|  44|
|  3| Mark Johnson|mark.johnson@emai...|NULL|
|  4|         NULL|emily.brown@email...|  76|
|  5|Michael Davis|       Hello missing|   7|
|  6| Sarah Wilson|sarah.wilson@emai...|NULL|
|  7|    Kevin Lee|       Hello missing|NULL|
|  8|         NULL|amanda.taylor@ema...|  60|
|  9| Brian Miller|brian.miller@emai...|  43|
| 10|         NULL|jessica.garcia@em...|NULL|
| 11|Eric Robinson|       Hello missing|  54|
| 12|         NULL|rachel.martinez@e...|  22|
| 13|  Ryan Carter|ryan.carter@email...|NULL|
| 14|         NULL|lisa.turner@email...|  25|
| 15|   Alex Brown|       Hello missing|  35|
+---+-------------+--------------------+----+



In [33]:
data.show()

+---+-------------+--------------------+----+
| id|     fullname|               email| age|
+---+-------------+--------------------+----+
|  1|     John Doe|  john.doe@email.com|  32|
|  2|   Jane Smith|                NULL|  44|
|  3| Mark Johnson|mark.johnson@emai...|NULL|
|  4|         NULL|emily.brown@email...|  76|
|  5|Michael Davis|                NULL|   7|
|  6| Sarah Wilson|sarah.wilson@emai...|NULL|
|  7|    Kevin Lee|                NULL|NULL|
|  8|         NULL|amanda.taylor@ema...|  60|
|  9| Brian Miller|brian.miller@emai...|  43|
| 10|         NULL|jessica.garcia@em...|NULL|
| 11|Eric Robinson|                NULL|  54|
| 12|         NULL|rachel.martinez@e...|  22|
| 13|  Ryan Carter|ryan.carter@email...|NULL|
| 14|         NULL|lisa.turner@email...|  25|
| 15|   Alex Brown|                NULL|  35|
+---+-------------+--------------------+----+



## Imputed function


In [34]:
from pyspark.ml.feature import Imputer

In [41]:
imputer = Imputer(
    inputCols=["age"],
    outputCols=["{}_imputed".format(c) for c in ["age"]]
).setStrategy("median")

In [42]:
# Add imputation cols to df
imputer.fit(data).transform(data).show()

+---+-------------+--------------------+----+-----------+
| id|     fullname|               email| age|age_imputed|
+---+-------------+--------------------+----+-----------+
|  1|     John Doe|  john.doe@email.com|  32|         32|
|  2|   Jane Smith|                NULL|  44|         44|
|  3| Mark Johnson|mark.johnson@emai...|NULL|         35|
|  4|         NULL|emily.brown@email...|  76|         76|
|  5|Michael Davis|                NULL|   7|          7|
|  6| Sarah Wilson|sarah.wilson@emai...|NULL|         35|
|  7|    Kevin Lee|                NULL|NULL|         35|
|  8|         NULL|amanda.taylor@ema...|  60|         60|
|  9| Brian Miller|brian.miller@emai...|  43|         43|
| 10|         NULL|jessica.garcia@em...|NULL|         35|
| 11|Eric Robinson|                NULL|  54|         54|
| 12|         NULL|rachel.martinez@e...|  22|         22|
| 13|  Ryan Carter|ryan.carter@email...|NULL|         35|
| 14|         NULL|lisa.turner@email...|  25|         25|
| 15|   Alex B

# Filter Operations

In [44]:
### find age less than 30 years old!

In [48]:
data.filter("age < 50").show()

+---+-------------+--------------------+---+
| id|     fullname|               email|age|
+---+-------------+--------------------+---+
|  1|     John Doe|  john.doe@email.com| 32|
|  2|   Jane Smith|                NULL| 44|
|  5|Michael Davis|                NULL|  7|
|  9| Brian Miller|brian.miller@emai...| 43|
| 12|         NULL|rachel.martinez@e...| 22|
| 14|         NULL|lisa.turner@email...| 25|
| 15|   Alex Brown|                NULL| 35|
+---+-------------+--------------------+---+



In [51]:
# Select multi columns, with condition
data.filter("age < 50").select(['fullname', 'age']).show()

+-------------+---+
|     fullname|age|
+-------------+---+
|     John Doe| 32|
|   Jane Smith| 44|
|Michael Davis|  7|
| Brian Miller| 43|
|         NULL| 22|
|         NULL| 25|
|   Alex Brown| 35|
+-------------+---+



In [58]:
# add column with specific values
from pyspark.sql.functions import rand, round

data = data.withColumn("Salary", round(rand() * 10000 + rand() * 13, 2))
data.show()

+---+-------------+--------------------+----+-------+
| id|     fullname|               email| age| Salary|
+---+-------------+--------------------+----+-------+
|  1|     John Doe|  john.doe@email.com|  32|9716.43|
|  2|   Jane Smith|                NULL|  44|9940.57|
|  3| Mark Johnson|mark.johnson@emai...|NULL|2774.51|
|  4|         NULL|emily.brown@email...|  76|5006.51|
|  5|Michael Davis|                NULL|   7|2723.69|
|  6| Sarah Wilson|sarah.wilson@emai...|NULL|7490.79|
|  7|    Kevin Lee|                NULL|NULL|8917.66|
|  8|         NULL|amanda.taylor@ema...|  60|1913.98|
|  9| Brian Miller|brian.miller@emai...|  43|9039.98|
| 10|         NULL|jessica.garcia@em...|NULL|1158.82|
| 11|Eric Robinson|                NULL|  54|6786.92|
| 12|         NULL|rachel.martinez@e...|  22|1807.43|
| 13|  Ryan Carter|ryan.carter@email...|NULL|6746.71|
| 14|         NULL|lisa.turner@email...|  25|2671.34|
| 15|   Alex Brown|                NULL|  35|7313.46|
+---+-------------+---------

In [61]:
# select with multi condition:
data.filter((data["Salary"] > 4500) & (data['age'] > 25)).show()

+---+-------------+--------------------+---+-------+
| id|     fullname|               email|age| Salary|
+---+-------------+--------------------+---+-------+
|  1|     John Doe|  john.doe@email.com| 32|9716.43|
|  2|   Jane Smith|                NULL| 44|9940.57|
|  4|         NULL|emily.brown@email...| 76|5006.51|
|  9| Brian Miller|brian.miller@emai...| 43|9039.98|
| 11|Eric Robinson|                NULL| 54|6786.92|
| 15|   Alex Brown|                NULL| 35|7313.46|
+---+-------------+--------------------+---+-------+



In [None]:
# 52:44 