# PySpark: Data Cleaning

In [None]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.mllib.stat import Statistics

import numpy as np
import pandas as pd

## 1. Miscellaneous techniques

### 1.1. Common techniques

In [1]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

#### Renaming columns

In [2]:
dfFish = spark.read.csv('../data/us_fishery_trade.csv', header=True)
dfFish.limit(5)

Year,Month,Product Name,Country Name,Month number,Value,Feature,Unit
2010,January,SABLEFISH FRESH,UNITED ARAB EMIRATES,1,2297,EXP Quantity,kg
2010,January,SABLEFISH FRESH,JAPAN,1,16025,EXP Quantity,kg
2010,January,SABLEFISH FRESH,JAPAN,1,63437,EXP Quantity,kg
2010,January,MONKFISH FRESH,CANADA,1,579,EXP Quantity,kg
2010,January,MONKFISH FRESH,CANADA,1,7975,EXP Quantity,kg


In [3]:
dfFish.withColumnRenamed('Year', 'year').limit(3)

year,Month,Product Name,Country Name,Month number,Value,Feature,Unit
2010,January,SABLEFISH FRESH,UNITED ARAB EMIRATES,1,2297,EXP Quantity,kg
2010,January,SABLEFISH FRESH,JAPAN,1,16025,EXP Quantity,kg
2010,January,SABLEFISH FRESH,JAPAN,1,63437,EXP Quantity,kg


In [4]:
dfFish.selectExpr('Year as year', 'Month as month').limit(3)

year,month
2010,January
2010,January
2010,January


In [5]:
column_names = [c.lower().replace(' ', '_') for c in dfFish.columns]
dfFish.toDF(*column_names).limit(3)

year,month,product_name,country_name,month_number,value,feature,unit
2010,January,SABLEFISH FRESH,UNITED ARAB EMIRATES,1,2297,EXP Quantity,kg
2010,January,SABLEFISH FRESH,JAPAN,1,16025,EXP Quantity,kg
2010,January,SABLEFISH FRESH,JAPAN,1,63437,EXP Quantity,kg


In [6]:
dfFish.select([F.col(c).alias(c.lower().replace(' ', '_')) for c in dfFish.columns]).limit(3)

year,month,product_name,country_name,month_number,value,feature,unit
2010,January,SABLEFISH FRESH,UNITED ARAB EMIRATES,1,2297,EXP Quantity,kg
2010,January,SABLEFISH FRESH,JAPAN,1,16025,EXP Quantity,kg
2010,January,SABLEFISH FRESH,JAPAN,1,63437,EXP Quantity,kg


#### Selecting columns

In [7]:
dfBoston = spark.read.csv('../data/boston.csv', header=True)
dfBoston.limit(5)

crime_rate,land_rate,indus,chas,nox,room,age,distance,radial,tax,ptratio,black,lstat,price
0.00632,18,2.31,0,0.538,6.575,65.2,4.09,1,296,15.3,396.9,4.98,24.0
0.02731,0,7.07,0,0.469,6.421,78.9,4.9671,2,242,17.8,396.9,9.14,21.6
0.02729,0,7.07,0,0.469,7.185,61.1,4.9671,2,242,17.8,392.83,4.03,34.7
0.03237,0,2.18,0,0.458,6.998,45.8,6.0622,3,222,18.7,394.63,2.94,33.4
0.06905,0,2.18,0,0.458,7.147,54.2,6.0622,3,222,18.7,396.9,5.33,36.2


In [8]:
columns = ['indus', 'chas', 'nox', 'room']
dfBoston.select(columns).limit(3)

indus,chas,nox,room
2.31,0,0.538,6.575
7.07,0,0.469,6.421
7.07,0,0.469,7.185


In [9]:
dfBoston.selectExpr('round(ptratio + price, 2) as new_variable').limit(3)

new_variable
39.3
39.4
52.5


In [10]:
dfBoston.drop('distance', 'radial', 'tax', 'ptratio', 'black', 'lstat', 'price').limit(3)

crime_rate,land_rate,indus,chas,nox,room,age
0.00632,18,2.31,0,0.538,6.575,65.2
0.02731,0,7.07,0,0.469,6.421,78.9
0.02729,0,7.07,0,0.469,7.185,61.1


#### Correcting data types

In [11]:
dfYoutube = spark.read.csv('../data/youtube_trending.csv', header=True)
dfYoutube.limit(5)

video_id,trending_date,channel_title,category_id,publish_time,views,likes,dislikes,comment_count,comments_disabled,ratings_disabled
2kyS6SvSYSE,2017-11-14,CaseyNeistat,22,2017-11-13T17:13:...,748374,57527,2966,15954,False,False
1ZAPwfrtAFY,2017-11-14,LastWeekTonight,24,2017-11-13T07:30:...,2418783,97185,6146,12703,False,False
5qpjK5DgCt4,2017-11-14,Rudy Mancuso,23,2017-11-12T19:05:...,3191434,146033,5339,8181,False,False
puqaWrEC7tY,2017-11-14,Good Mythical Mor...,24,2017-11-13T11:00:...,343168,10172,666,2146,False,False
d380meD0W0M,2017-11-14,nigahiga,24,2017-11-12T18:01:...,2095731,132235,1989,17518,False,False


In [12]:
dfYoutube\
    .withColumn('trending_date', F.to_date('trending_date'))\
    .withColumn('publish_time', F.to_timestamp('publish_time'))\
    .withColumn('category_id', F.col('category_id').cast('int'))\
    .withColumn('views', F.col('views').cast('int'))\
    .withColumn('likes', F.col('likes').cast('int'))\
    .withColumn('dislikes', F.col('dislikes').cast('int'))\
    .withColumn('comment_count', F.col('comment_count').cast('int'))\
    .withColumn('comments_disabled', F.col('comments_disabled').cast('boolean'))\
    .withColumn('ratings_disabled', F.col('ratings_disabled').cast('boolean'))\
    .limit(5)

video_id,trending_date,channel_title,category_id,publish_time,views,likes,dislikes,comment_count,comments_disabled,ratings_disabled
2kyS6SvSYSE,2017-11-14,CaseyNeistat,22,2017-11-14 00:13:01,748374,57527,2966,15954,False,False
1ZAPwfrtAFY,2017-11-14,LastWeekTonight,24,2017-11-13 14:30:00,2418783,97185,6146,12703,False,False
5qpjK5DgCt4,2017-11-14,Rudy Mancuso,23,2017-11-13 02:05:24,3191434,146033,5339,8181,False,False
puqaWrEC7tY,2017-11-14,Good Mythical Mor...,24,2017-11-13 18:00:04,343168,10172,666,2146,False,False
d380meD0W0M,2017-11-14,nigahiga,24,2017-11-13 01:01:41,2095731,132235,1989,17518,False,False


### 1.2. Text manipulation

In [13]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [14]:
dfSupermarket = spark.read.csv('../data/supermarket_sales.csv', header=True)
dfSupermarket.limit(5)

invoice_id,brand,city,customer_type,gender,product_line,unit_price,quantity,tax,date,time,payment,cost,gross_margin_percentage,profit,rating
750-67-8428,A,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,01/05/2019,13:08,Ewallet,522.83,4.761904762,26.1415,9.1
226-31-3081,C,Naypyitaw,Normal,Female,Electronic access...,15.28,5,3.82,03/08/2019,10:29,Cash,76.4,4.761904762,3.82,9.6
631-41-3108,A,Yangon,Normal,Male,Home and lifestyle,46.33,7,16.2155,03/03/2019,13:23,Credit card,324.31,4.761904762,16.2155,7.4
123-19-1176,A,Yangon,Member,Male,Health and beauty,58.22,8,23.288,1/27/2019,20:33,Ewallet,465.76,4.761904762,23.288,8.4
373-73-7910,A,Yangon,Normal,Male,Sports and travel,86.31,7,30.2085,02/08/2019,10:37,Ewallet,604.17,4.761904762,30.2085,5.3


#### Extracting substrings

In [15]:
invoice_id = F.split('invoice_id', pattern='-')
dfSupermarket\
    .select('invoice_id')\
    .withColumn('first_part', invoice_id.getItem(0))\
    .withColumn('second_part', invoice_id.getItem(1))\
    .withColumn('third_part', invoice_id.getItem(2))\
    .limit(5)

invoice_id,first_part,second_part,third_part
750-67-8428,750,67,8428
226-31-3081,226,31,3081
631-41-3108,631,41,3108
123-19-1176,123,19,1176
373-73-7910,373,73,7910


In [16]:
dfSupermarket.select('city', F.substring('city', 0, 4)).limit(5)

city,"substring(city, 0, 4)"
Yangon,Yang
Naypyitaw,Nayp
Yangon,Yang
Yangon,Yang
Yangon,Yang


#### Concatenating columns

In [17]:
dfSupermarket.select(F.concat('customer_type', F.lit(', '), 'gender')).limit(3)

"concat(customer_type, , , gender)"
"Member, Female"
"Normal, Female"
"Normal, Male"


#### Transforming case

In [18]:
dfSupermarket.select(F.upper('city')).limit(3)

upper(city)
YANGON
NAYPYITAW
YANGON


In [19]:
dfSupermarket.select(F.initcap('product_line')).limit(3)

initcap(product_line)
Health And Beauty
Electronic Access...
Home And Lifestyle


#### Padding

In [20]:
dfSupermarket.select('city').distinct().select(F.lpad('city', len=9, pad='*'))

"lpad(city, 9, *)"
Naypyitaw
*Mandalay
***Yangon


In [21]:
dfSupermarket.select('city').distinct().select(F.rpad('city', len=9, pad='*'))

"rpad(city, 9, *)"
Naypyitaw
Mandalay*
Yangon***


#### Trimming
PySpark supports `trim()`, `ltrim()` and `rtrim()` functions.

In [22]:
df = spark.createDataFrame(
    data=[['United Kingdom'], [' United Kingdom'], ['United Kingdom ']],
    schema=['country'])
df.select(F.countDistinct('country'))

count(DISTINCT country)
3


In [23]:
df.select(F.trim('country').alias('country')).select(F.countDistinct('country'))

count(DISTINCT country)
1


#### Replacing

In [24]:
dfSupermarket\
    .select('product_line').distinct()\
    .withColumn('product_line_new', F.regexp_replace('product_line', pattern=' ', replacement='_'))

product_line,product_line_new
Home and lifestyle,Home_and_lifestyle
Fashion accessories,Fashion_accessories
Health and beauty,Health_and_beauty
Electronic access...,Electronic_access...
Food and beverages,Food_and_beverages
Sports and travel,Sports_and_travel


#### Other techniques

In [25]:
dfSupermarket.select('product_line').distinct().where(~F.col('product_line').contains('ty'))

product_line
Fashion accessories
Electronic access...
Food and beverages
Sports and travel


### 1.3. Date manipulation

In [26]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [27]:
dfYoutube = spark.read.csv('../data/youtube_trending.csv', header=True)
dfYoutube.limit(5)

video_id,trending_date,channel_title,category_id,publish_time,views,likes,dislikes,comment_count,comments_disabled,ratings_disabled
2kyS6SvSYSE,2017-11-14,CaseyNeistat,22,2017-11-13T17:13:...,748374,57527,2966,15954,False,False
1ZAPwfrtAFY,2017-11-14,LastWeekTonight,24,2017-11-13T07:30:...,2418783,97185,6146,12703,False,False
5qpjK5DgCt4,2017-11-14,Rudy Mancuso,23,2017-11-12T19:05:...,3191434,146033,5339,8181,False,False
puqaWrEC7tY,2017-11-14,Good Mythical Mor...,24,2017-11-13T11:00:...,343168,10172,666,2146,False,False
d380meD0W0M,2017-11-14,nigahiga,24,2017-11-12T18:01:...,2095731,132235,1989,17518,False,False


#### The standard datetime format

In [28]:
dfYoutube = dfYoutube\
    .withColumn('trending_date', F.to_date('trending_date'))\
    .withColumn('publish_time', F.to_timestamp('publish_time'))
dfYoutube.select('trending_date', 'publish_time').distinct().limit(5)

trending_date,publish_time
2017-11-14,2017-11-09 04:50:37
2017-11-15,2017-11-14 07:45:15
2017-11-15,2017-11-14 03:47:49
2017-11-16,2017-11-14 03:00:01
2017-11-18,2017-11-13 21:44:24


#### Unix timestamp

In [29]:
dfYoutube.select(F.unix_timestamp('publish_time').alias('unix_publish_time')).limit(3)

unix_publish_time
1510593181
1510558200
1510513524


#### Extracting date part
PySpark supports a wide range of [datetime functions](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html).

In [30]:
dfYoutube.select(
    F.col('publish_time'),
    F.date_format('publish_time', 'yyyy').alias('year'),
    F.date_format('publish_time', 'yyyy-q').alias('quarter'),
    F.concat(F.year('publish_time'), F.lit('-'), F.weekofyear('publish_time')).alias('week'),
).limit(5)

publish_time,year,quarter,week
2017-11-14 00:13:01,2017,2017-4,2017-46
2017-11-13 14:30:00,2017,2017-4,2017-46
2017-11-13 02:05:24,2017,2017-4,2017-46
2017-11-13 18:00:04,2017,2017-4,2017-46
2017-11-13 01:01:41,2017,2017-4,2017-46


#### Extracting cyclic attributes

In [31]:
dfYoutube.select(
    F.col('publish_time'),
    F.year('publish_time').alias('year'),
    F.quarter('publish_time').alias('quarter'),
    F.month('publish_time').alias('month'),
    F.date_format('publish_time', 'MMMM').alias('month_name'),
    F.weekofyear('publish_time').alias('week_of_year'),
    F.dayofyear('publish_time').alias('day_of_year'),
    F.dayofmonth('publish_time').alias('day'),
    F.date_format('publish_time', 'EEEE').alias('weekday'),
    F.hour('publish_time').alias('hour'),
    F.minute('publish_time').alias('minute'),
    F.second('publish_time').alias('second'),
).limit(5)

publish_time,year,quarter,month,month_name,week_of_year,day_of_year,day,weekday,hour,minute,second
2017-11-14 00:13:01,2017,4,11,November,46,318,14,Tuesday,0,13,1
2017-11-13 14:30:00,2017,4,11,November,46,317,13,Monday,14,30,0
2017-11-13 02:05:24,2017,4,11,November,46,317,13,Monday,2,5,24
2017-11-13 18:00:04,2017,4,11,November,46,317,13,Monday,18,0,4
2017-11-13 01:01:41,2017,4,11,November,46,317,13,Monday,1,1,41


#### Rounding date
PySpark allows rounding datetime using the `date_trunc()` function with specifics values of the `format` parameter: `year`, `quarter`, `month`, `day`, `hour`, `minute`, `second`.

In [32]:
dfYoutube.select(F.date_trunc(format='day', timestamp='publish_time')).distinct().limit(3)

"date_trunc(day, publish_time)"
2017-11-05 00:00:00
2016-06-20 00:00:00
2017-10-21 00:00:00


#### Date calculation

In [33]:
dfYoutube.limit(3)

video_id,trending_date,channel_title,category_id,publish_time,views,likes,dislikes,comment_count,comments_disabled,ratings_disabled
2kyS6SvSYSE,2017-11-14,CaseyNeistat,22,2017-11-14 00:13:01,748374,57527,2966,15954,False,False
1ZAPwfrtAFY,2017-11-14,LastWeekTonight,24,2017-11-13 14:30:00,2418783,97185,6146,12703,False,False
5qpjK5DgCt4,2017-11-14,Rudy Mancuso,23,2017-11-13 02:05:24,3191434,146033,5339,8181,False,False


In [34]:
dfYoutube.select(
    'trending_date',
    F.datediff(F.current_date(), F.col('trending_date')).alias('days_to_now')
).distinct().limit(5)

trending_date,days_to_now
2017-12-27,1308
2017-11-28,1337
2017-11-22,1343
2017-12-25,1310
2017-11-17,1348


In [35]:
dfYoutube.select(
    'trending_date',
    F.date_add(F.col('trending_date'), days=-20).alias('20_days_ealier')
).distinct().limit(5)

trending_date,20_days_ealier
2017-12-11,2017-11-21
2017-12-03,2017-11-13
2017-12-17,2017-11-27
2017-11-30,2017-11-10
2017-11-14,2017-10-25


In [36]:
dfYoutube.select(
    'trending_date',
    F.add_months(F.col('trending_date'), months=12).alias('same_day_next_year')
).distinct().limit(5)

trending_date,same_day_next_year
2017-12-06,2018-12-06
2017-12-23,2018-12-23
2017-12-07,2018-12-07
2017-12-17,2018-12-17
2017-11-14,2018-11-14


### 1.4. Numerical functions

In [37]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [38]:
dfYoutube = spark.read.csv('../data/youtube_trending.csv', header=True)
dfYoutube.limit(5)

video_id,trending_date,channel_title,category_id,publish_time,views,likes,dislikes,comment_count,comments_disabled,ratings_disabled
2kyS6SvSYSE,2017-11-14,CaseyNeistat,22,2017-11-13T17:13:...,748374,57527,2966,15954,False,False
1ZAPwfrtAFY,2017-11-14,LastWeekTonight,24,2017-11-13T07:30:...,2418783,97185,6146,12703,False,False
5qpjK5DgCt4,2017-11-14,Rudy Mancuso,23,2017-11-12T19:05:...,3191434,146033,5339,8181,False,False
puqaWrEC7tY,2017-11-14,Good Mythical Mor...,24,2017-11-13T11:00:...,343168,10172,666,2146,False,False
d380meD0W0M,2017-11-14,nigahiga,24,2017-11-12T18:01:...,2095731,132235,1989,17518,False,False


In [39]:
dfYoutube.select('likes', F.round('likes', -3).cast('int')).limit(3)

likes,"CAST(round(likes, -3) AS INT)"
57527,58000
97185,97000
146033,146000


In [40]:
dfYoutube.select(F.max('likes'))

max(likes)
99980


In [41]:
dfYoutube.select(F.sum('likes'))

sum(likes)
468961011.0


In [42]:
dfYoutube.select('likes', F.cbrt('likes').cast('int')).limit(3)

likes,CAST(CBRT(likes) AS INT)
57527,38
97185,45
146033,52


## 2. Handling abnormality

### 2.1. Duplicated values

In [43]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [44]:
dfYoutube = spark.read.csv('../data/youtube_trending.csv', header=True)
dfYoutube.limit(5)

video_id,trending_date,channel_title,category_id,publish_time,views,likes,dislikes,comment_count,comments_disabled,ratings_disabled
2kyS6SvSYSE,2017-11-14,CaseyNeistat,22,2017-11-13T17:13:...,748374,57527,2966,15954,False,False
1ZAPwfrtAFY,2017-11-14,LastWeekTonight,24,2017-11-13T07:30:...,2418783,97185,6146,12703,False,False
5qpjK5DgCt4,2017-11-14,Rudy Mancuso,23,2017-11-12T19:05:...,3191434,146033,5339,8181,False,False
puqaWrEC7tY,2017-11-14,Good Mythical Mor...,24,2017-11-13T11:00:...,343168,10172,666,2146,False,False
d380meD0W0M,2017-11-14,nigahiga,24,2017-11-12T18:01:...,2095731,132235,1989,17518,False,False


In [45]:
# keep first observation only
dfYoutube.select('category_id').drop_duplicates()

category_id
15
29
22
28
43
27
17
26
19
23


### 2.2. Handling missing values
Missing values in PySpark are represented by `null` or `nan`.

In [46]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [47]:
data = [
    ('Laptop', '$1000', None),
    ('Mouse', '$20', 100),
    ('Headphone', '$50', 50),
    ('USB', None, None)
]

schema = ['product', 'price', 'stock']

dfProduct = spark.createDataFrame(data, schema)
dfProduct

product,price,stock
Laptop,$1000,
Mouse,$20,100.0
Headphone,$50,50.0
USB,,


In [48]:
dfProduct.filter('stock IS NULL and price IS NULL')

product,price,stock
USB,,


In [49]:
dfProduct.dropna(subset=['stock'],how='any')

product,price,stock
Mouse,$20,100
Headphone,$50,50


In [50]:
dfProduct.fillna({
    'price': 100,
    'stock': 0
})

product,price,stock
Laptop,$1000,0
Mouse,$20,100
Headphone,$50,50
USB,100,0
