In [1]:
sc

In [3]:
from pyspark.sql.functions import *

# Define a dataset.

df = sc.parallelize([
    (10, '', 10000), (20, 'Female', 30000), (None, 'Male', 80000), (None, 'Male', 5000)
]).toDF(["age", "gender", "income"])

df.show()

+----+------+------+
| age|gender|income|
+----+------+------+
|  10|      | 10000|
|  20|Female| 30000|
|null|  Male| 80000|
|null|  Male|  5000|
+----+------+------+



In [5]:
# EDA
df.describe().show()



+-------+------------------+------+-----------------+
|summary|               age|gender|           income|
+-------+------------------+------+-----------------+
|  count|                 2|     4|                4|
|   mean|              15.0|  null|          31250.0|
| stddev|7.0710678118654755|  null|34247.87098005753|
|    min|                10|      |             5000|
|    max|                20|  Male|            80000|
+-------+------------------+------+-----------------+



                                                                                

In [6]:
# Data Cleansing: Null
# Treat Null Value (None) with Average one.

avg_age = df.na.drop().agg(avg("age")).collect()[0][0]

sparkf_replaceNull = udf(lambda x: avg_age if x == None else x)

no_null_df = df.withColumn('age', sparkf_replaceNull(col('age')))

no_null_df.show()

                                                                                

+----+------+------+
| age|gender|income|
+----+------+------+
|  10|      | 10000|
|  20|Female| 30000|
|15.0|  Male| 80000|
|15.0|  Male|  5000|
+----+------+------+



In [7]:
# Data Cleansing: Empty Values
# Treat Missing Value with Defined Values.

from pyspark.sql.functions import *

treat_missing = udf(lambda x: "Male_Assume" if x == "" else x)

no_missing_df = no_null_df.withColumn('new_gender',treat_missing(no_null_df.gender))

no_missing_df.show()

[Stage 14:>                                                         (0 + 1) / 1]

+----+------+------+-----------+
| age|gender|income| new_gender|
+----+------+------+-----------+
|  10|      | 10000|Male_Assume|
|  20|Female| 30000|     Female|
|15.0|  Male| 80000|       Male|
|15.0|  Male|  5000|       Male|
+----+------+------+-----------+



                                                                                

In [8]:
# Data Cleansing: Outliers (Business-oriented)
# Treat Outliner with Remove one.

no_outlier_df = no_missing_df.filter(col('income') >= 10000)

no_outlier_df .show()

+----+------+------+-----------+
| age|gender|income| new_gender|
+----+------+------+-----------+
|  10|      | 10000|Male_Assume|
|  20|Female| 30000|     Female|
|15.0|  Male| 80000|       Male|
+----+------+------+-----------+



## Data Normalization using DataFrame APIs.

In [1]:
colors = ['white','green','yellow','red','brown','pink'] 
color_rdd = sc.parallelize(colors)
keyval_rdd = color_rdd.map(lambda x:(x,len(x)))
keyval_rdd.collect()

                                                                                

[('white', 5),
 ('green', 5),
 ('yellow', 6),
 ('red', 3),
 ('brown', 5),
 ('pink', 4)]

In [5]:
color_df = keyval_rdd.toDF(['color','length']) 
color_df.show()

[Stage 5:>                                                          (0 + 1) / 1]

+------+------+
| color|length|
+------+------+
| white|     5|
| green|     5|
|yellow|     6|
|   red|     3|
| brown|     5|
|  pink|     4|
+------+------+



                                                                                

In [6]:
from pyspark.sql.functions import *

In [7]:
min_length = color_df.select(min('length')).collect()[0][0]
min_length

3

In [8]:
max_length = color_df.select(max('length')).collect()[0][0]
max_length

6

In [9]:
max_min_normalization = udf(lambda x: (x-min_length)/(max_length-min_length))

In [10]:
color_df2 = color_df.withColumn('normalliztion', max_min_normalization(color_df.length))

In [11]:
color_df2.show()

+------+------+------------------+
| color|length|     normalliztion|
+------+------+------------------+
| white|     5|0.6666666666666666|
| green|     5|0.6666666666666666|
|yellow|     6|               1.0|
|   red|     3|               0.0|
| brown|     5|0.6666666666666666|
|  pink|     4|0.3333333333333333|
+------+------+------------------+

