<a href="https://colab.research.google.com/github/frong123nk/Data_Cleansing/blob/main/Data_Cleansing_with_Spark_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Data from**
https://archive.ics.uci.edu/ml/datasets/Online+Retail

### setup library

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xzvf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark==1.3.0

In [2]:
# Set enviroment variable for Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# pip pyspark
!pip install pyspark==3.1.2

In [6]:
#Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "legacy")

In [9]:
import sys
sys.version_info

sys.version_info(major=3, minor=7, micro=11, releaselevel='final', serial=0)

In [10]:
spark.version

'3.1.2'

In [11]:
# Google colab connect to Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Load data

In [12]:
#Load data
dt = spark.read.csv('/content/drive/My Drive/data_cleansing/Online Retail.csv', header = True, inferSchema = True, )

In [13]:
#check type
dt.dtypes 

[('InvoiceNo', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'int'),
 ('InvoiceDate', 'string'),
 ('UnitPrice', 'double'),
 ('CustomerID', 'double'),
 ('Country', 'string')]

In [14]:
dt.show(10)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1/12/2018 08:26|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1/12/2018 08:26|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|1/12/2018 08:26|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|1/12/2018 08:

In [15]:
# count total data and columns
print((dt.count(), len(dt.columns)))

(541909, 8)


###Type Conversion





In [16]:
# Show unique Invoice Date
dt.select("InvoiceDate").distinct().show(5)

+----------------+
|     InvoiceDate|
+----------------+
| 2/12/2018 11:23|
| 6/12/2018 13:41|
| 9/12/2018 14:44|
|13/12/2018 13:09|
|13/12/2018 16:46|
+----------------+
only showing top 5 rows



In [19]:
# convert string to date
# date formate => DD/MM/YYYY
from pyspark.sql import functions as f

dt_temp = dt.withColumn('InvoiceDateTime', 
    f.unix_timestamp('InvoiceDate', 'dd/MM/yyyy HH:mm').cast('timestamp')
)
dt_final = dt_temp.drop('InvoiceDate')
dt_final.show(5)

+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|    InvoiceDateTime|
+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|   17850.0|United Kingdom|2018-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|   17850.0|United Kingdom|2018-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|   17850.0|United Kingdom|2018-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|   17850.0|United Kingdom|2018-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|   17850.0|United Kingdom|2018-12-01 08:26:00|
+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
only showing top 5 rows



In [18]:
#show schema
dt_final.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- InvoiceDateTime: timestamp (nullable = true)



### **Anomalies Check**

#### Syntactical Anomalies

In [20]:
# Check country distinct values.
dt_final.select("Country").distinct().show()

+------------------+
|           Country|
+------------------+
|            Sweden|
|         Singapore|
|           Germany|
|               RSA|
|            France|
|            Greece|
|European Community|
|           Belgium|
|           Finland|
|             Malta|
|       Unspecified|
|             Italy|
|              EIRE|
|         Lithuania|
|            Norway|
|             Spain|
|             EIREs|
|           Denmark|
|         Hong Kong|
|           Iceland|
+------------------+
only showing top 20 rows



In [21]:
# convert EIREs to EIRE
from pyspark.sql.functions import when
dt_temp_eire = dt_final.withColumn("CountryUpdate", when(dt_final['Country'] == 'EIREs', 'EIRE').otherwise(dt_final['Country']))
dt_temp_eire.select("CountryUpdate").distinct().show()


+------------------+
|     CountryUpdate|
+------------------+
|            Sweden|
|         Singapore|
|           Germany|
|               RSA|
|            France|
|            Greece|
|European Community|
|           Belgium|
|           Finland|
|             Malta|
|       Unspecified|
|             Italy|
|              EIRE|
|         Lithuania|
|            Norway|
|             Spain|
|           Denmark|
|         Hong Kong|
|           Iceland|
|            Israel|
+------------------+
only showing top 20 rows



In [22]:
#drop column and rename CountryUpdate to Country
dt_final_eire = dt_temp_eire.drop("Country").withColumnRenamed('CountryUpdate', 'Country')
dt_final_eire.show(5)

+---------+---------+--------------------+--------+---------+----------+-------------------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|    InvoiceDateTime|       Country|
+---------+---------+--------------------+--------+---------+----------+-------------------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|   17850.0|2018-12-01 08:26:00|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|   17850.0|2018-12-01 08:26:00|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|   17850.0|2018-12-01 08:26:00|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|   17850.0|2018-12-01 08:26:00|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|   17850.0|2018-12-01 08:26:00|United Kingdom|
+---------+---------+--------------------+--------+---------+----------+-------------------+--------------+
only showing top 5 rows



#### Semantic Anomalies

StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.

In [23]:
dt_final_eire.count()

541909

In [25]:
# Use regular expressions to filter correct data in stockcode column
dt_final_eire.filter(dt_final_eire["Stockcode"].rlike("^[0-9]{5}$")).count()

487036

In [26]:
#show data incorrect
dt_correct_stockcode = dt_final_eire.filter(dt_final_eire["Stockcode"].rlike("^[0-9]{5}$"))
dt_incorrect_stockcode = dt_final_eire.subtract(dt_correct_stockcode)
dt_incorrect_stockcode.show(5)

+---------+---------+--------------------+--------+---------+----------+-------------------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|    InvoiceDateTime|       Country|
+---------+---------+--------------------+--------+---------+----------+-------------------+--------------+
|   540122|   84926E|FLOWERS TILE COASTER|      96|     0.42|   13694.0|2019-01-05 10:39:00|United Kingdom|
|   542936|   84596G|SMALL CHOCOLATES ...|       1|     1.25|      null|2019-02-02 11:10:00|United Kingdom|
|   539479|   84997B|RED 3 PIECE RETRO...|       1|     3.75|      null|2018-12-19 15:20:00|United Kingdom|
|   545308|   84406B|CREAM CUPID HEART...|       1|     4.15|   14656.0|2019-03-01 13:19:00|United Kingdom|
|   576687|   85099B|JUMBO BAG RED RET...|       3|     2.08|   13558.0|2019-11-16 12:13:00|United Kingdom|
+---------+---------+--------------------+--------+---------+----------+-------------------+--------------+
only showing top 5 rows



In [28]:
# delete the last character of stockcode
from pyspark.sql.functions import regexp_replace

dt_temp_stockcode = dt_final_eire.withColumn("StockcodeUpdate", regexp_replace(dt_final_eire['Stockcode'], r'[A-z]', ''))

In [29]:
# Check the result
dt_temp_stockcode.show()

+---------+---------+--------------------+--------+---------+----------+-------------------+--------------+---------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|    InvoiceDateTime|       Country|StockcodeUpdate|
+---------+---------+--------------------+--------+---------+----------+-------------------+--------------+---------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|   17850.0|2018-12-01 08:26:00|United Kingdom|          85123|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|   17850.0|2018-12-01 08:26:00|United Kingdom|          71053|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|   17850.0|2018-12-01 08:26:00|United Kingdom|          84406|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|   17850.0|2018-12-01 08:26:00|United Kingdom|          84029|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|   17850.0|2018-12-01 08:26:00|United Kingdom|          84029|
|   5363

In [30]:
# Create final Dataframe
dt_final_stockcode = dt_temp_stockcode.drop("Stockcode").withColumnRenamed('StockcodeUpdate', 'StockCode')
dt_final_stockcode.show(4)

+---------+--------------------+--------+---------+----------+-------------------+--------------+---------+
|InvoiceNo|         Description|Quantity|UnitPrice|CustomerID|    InvoiceDateTime|       Country|StockCode|
+---------+--------------------+--------+---------+----------+-------------------+--------------+---------+
|   536365|WHITE HANGING HEA...|       6|     2.55|   17850.0|2018-12-01 08:26:00|United Kingdom|    85123|
|   536365| WHITE METAL LANTERN|       6|     3.39|   17850.0|2018-12-01 08:26:00|United Kingdom|    71053|
|   536365|CREAM CUPID HEART...|       8|     2.75|   17850.0|2018-12-01 08:26:00|United Kingdom|    84406|
|   536365|KNITTED UNION FLA...|       6|     3.39|   17850.0|2018-12-01 08:26:00|United Kingdom|    84029|
+---------+--------------------+--------+---------+----------+-------------------+--------------+---------+
only showing top 4 rows



#### Missing values

In [32]:
# Check quantity missing values in each column
from pyspark.sql.functions import col,sum

dt_final_stockcode.select(*[sum(col(c).isNull().cast("int")).alias(c) for c in dt_final_stockcode.columns]).show()

+---------+-----------+--------+---------+----------+---------------+-------+---------+
|InvoiceNo|Description|Quantity|UnitPrice|CustomerID|InvoiceDateTime|Country|StockCode|
+---------+-----------+--------+---------+----------+---------------+-------+---------+
|        0|       1454|       0|        0|    135080|              0|      0|        0|
+---------+-----------+--------+---------+----------+---------------+-------+---------+



In [38]:
# Check null in CustomerID 
dt_final_stockcode.where( dt_final_stockcode['CustomerID'].isNull() ).show()

+---------+--------------------+--------+---------+----------+-------------------+--------------+---------+
|InvoiceNo|         Description|Quantity|UnitPrice|CustomerID|    InvoiceDateTime|       Country|StockCode|
+---------+--------------------+--------+---------+----------+-------------------+--------------+---------+
|   536414|                null|      56|      0.0|      null|2018-12-01 11:52:00|United Kingdom|    22139|
|   536544|DECORATIVE ROSE B...|       1|     2.51|      null|2018-12-01 14:32:00|United Kingdom|    21773|
|   536544|DECORATIVE CATS B...|       2|     2.51|      null|2018-12-01 14:32:00|United Kingdom|    21774|
|   536544|  POLKADOT RAIN HAT |       4|     0.85|      null|2018-12-01 14:32:00|United Kingdom|    21786|
|   536544|RAIN PONCHO RETRO...|       2|     1.66|      null|2018-12-01 14:32:00|United Kingdom|    21787|
|   536544|  VINTAGE SNAP CARDS|       9|     1.66|      null|2018-12-01 14:32:00|United Kingdom|    21790|
|   536544|VINTAGE HEADS AND

In [42]:
#convert null to -1
dt_customer_notnull = dt_final_stockcode.withColumn("CustomerIDUpdate", when(dt_final_stockcode['customerID'].isNull(), -1).otherwise(dt_final_stockcode['customerID']))
dt_final = dt_customer_notnull.drop('customerID').withColumnRenamed('CustomerIDUpdate','customerID')
dt_final.show()

+---------+--------------------+--------+---------+-------------------+--------------+---------+----------+
|InvoiceNo|         Description|Quantity|UnitPrice|    InvoiceDateTime|       Country|StockCode|customerID|
+---------+--------------------+--------+---------+-------------------+--------------+---------+----------+
|   536365|WHITE HANGING HEA...|       6|     2.55|2018-12-01 08:26:00|United Kingdom|    85123|   17850.0|
|   536365| WHITE METAL LANTERN|       6|     3.39|2018-12-01 08:26:00|United Kingdom|    71053|   17850.0|
|   536365|CREAM CUPID HEART...|       8|     2.75|2018-12-01 08:26:00|United Kingdom|    84406|   17850.0|
|   536365|KNITTED UNION FLA...|       6|     3.39|2018-12-01 08:26:00|United Kingdom|    84029|   17850.0|
|   536365|RED WOOLLY HOTTIE...|       6|     3.39|2018-12-01 08:26:00|United Kingdom|    84029|   17850.0|
|   536365|SET 7 BABUSHKA NE...|       2|     7.65|2018-12-01 08:26:00|United Kingdom|    22752|   17850.0|
|   536365|GLASS STAR FROSTE

In [45]:
#use Spark SQL check value
dt_final.createOrReplaceTempView("table")
dt_sql = spark.sql("SELECT * FROM table where customerID < 0 ")
dt_sql.show()

+---------+--------------------+--------+---------+-------------------+--------------+---------+----------+
|InvoiceNo|         Description|Quantity|UnitPrice|    InvoiceDateTime|       Country|StockCode|customerID|
+---------+--------------------+--------+---------+-------------------+--------------+---------+----------+
|   536414|                null|      56|      0.0|2018-12-01 11:52:00|United Kingdom|    22139|      -1.0|
|   536544|DECORATIVE ROSE B...|       1|     2.51|2018-12-01 14:32:00|United Kingdom|    21773|      -1.0|
|   536544|DECORATIVE CATS B...|       2|     2.51|2018-12-01 14:32:00|United Kingdom|    21774|      -1.0|
|   536544|  POLKADOT RAIN HAT |       4|     0.85|2018-12-01 14:32:00|United Kingdom|    21786|      -1.0|
|   536544|RAIN PONCHO RETRO...|       2|     1.66|2018-12-01 14:32:00|United Kingdom|    21787|      -1.0|
|   536544|  VINTAGE SNAP CARDS|       9|     1.66|2018-12-01 14:32:00|United Kingdom|    21790|      -1.0|
|   536544|VINTAGE HEADS AND