In [None]:
#finding spark from my local environment
import findspark
findspark.init()
findspark.find()

'c:\\Anaconda\\envs\\spkue\\lib\\site-packages\\pyspark'

In [None]:
#initialising pyspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark= SparkSession.builder.appName("data cleaning").getOrCreate()
spark

In [None]:
#loading csv data for cleaning
sample = spark.read.format("csv").option("InferSchema",True).option("header",True).load("C:/Users/gabri/Downloads/superstore.csv")

In [65]:
sample.show(10000,truncate=False)

+------+--------------+----------+----------+--------------+-----------+----------------------+-----------+-------------+-----------------+--------------------+-----------+-------+---------------+---------------+------------+-------------------------------------------------------------------------------------------------------------------------------+----------------------------------+---------------+-------------------+----------+
|Row ID|Order ID      |Order Date|Ship Date |Ship Mode     |Customer ID|Customer Name         |Segment    |Country      |City             |State               |Postal Code|Region |Product ID     |Category       |Sub-Category|Product Name                                                                                                                   |Sales                             |Quantity       |Discount           |Profit    |
+------+--------------+----------+----------+--------------+-----------+----------------------+-----------+-------------+-------

In [None]:
#function to display 5 rows per dataframe
def displays(df):
    return df.show(5,truncate=False)

In [None]:
# Outlining our dimension tables and facts table
displays(sample)

sales = sample.select(col("Row ID"),col("Order Date"),col("Ship Date"),col("Ship Mode")
                      ,col("Customer ID"),col("Postal Code"),col("Product ID"),col("Sales"),col("Quantity")
                      ,col("Discount"),col("Profit"))
customer =sample.select(col("Customer ID"),col("Customer Name"),col("Segment"),col("Country"),col("City"),col("State"),col("Postal Code"),col("Region"))

product = sample.select(col("Product ID"),col("Product Name"),col("Category"),col("Sub-Category"))

displays(sales)
displays(customer)
displays(product)

+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+-----------------------------------------------------------+--------+--------+--------+--------+
|Row ID|Order ID      |Order Date|Ship Date |Ship Mode     |Customer ID|Customer Name  |Segment  |Country      |City           |State     |Postal Code|Region|Product ID     |Category       |Sub-Category|Product Name                                               |Sales   |Quantity|Discount|Profit  |
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+-----------------------------------------------------------+--------+--------+--------+--------+
|1     |CA-2016-152156|11/8/2016 |11/11/2016|Second Class  |CG-12520   |Claire Gute    |Consumer |Un

In [None]:
#Data Cleaning process

from pyspark.sql.window import Window

duplicates= customer.groupBy(col("Customer ID")).agg(count(col("Customer ID")).alias("count")).filter(col("count")>1)

displays(duplicates)

clean_customer =customer.withColumn("flag",row_number().over(Window.partitionBy(col("Customer ID")).orderBy(asc(col("Customer ID"))))).filter(col("flag")==1)\
                        .withColumn("customer_key",row_number().over(Window.orderBy(asc(col("Customer ID"))))).drop("flag")\
                        .withColumnRenamed("Customer ID","customer_id").withColumnRenamed("Customer Name","name").withColumnRenamed("Segment","segment")\
                        .withColumnRenamed("Country","country").withColumnRenamed("City","city").withColumnRenamed("State","state")\
                        .withColumnRenamed("Postal Code","postal_code").withColumnRenamed("Region","region").withColumnRenamed("State","state")

displays(clean_customer)

product_duplicates =product.groupBy(col("Product ID")).agg(count(col("Product ID")).alias("count")).filter(col("count")>1)
displays(product_duplicates)

clean_product=product.withColumn("flag",row_number().over(Window.partitionBy(col("Product ID")).orderBy(asc(col("Product ID"))))).filter(col("flag")==1)\
                    .withColumn("product_key",row_number().over(Window.orderBy(asc(col("Product ID"))))).drop("flag")\
                    .withColumnRenamed("Product ID","product_id").withColumnRenamed("Product Name","product_name").withColumnRenamed("Category","category")\
                    .withColumnRenamed("Sub-Category","sub-category")
displays(clean_product)

clean_sales = sales.withColumnRenamed("Row ID","row_id").withColumnRenamed("Order Date","order_date").withColumnRenamed("Ship Date","ship_date")\
                        .withColumnRenamed("Ship Mode","ship_mode").withColumnRenamed("Customer ID","customer_id").withColumnRenamed("Postal Code","postal_code")\
                        .withColumnRenamed("Product ID","product_id").withColumnRenamed("Sales","sales").withColumnRenamed("Quantity","quantity")\
                        .withColumnRenamed("Discount","discount").withColumnRenamed("Profit","profit")

+-----------+-----+
|Customer ID|count|
+-----------+-----+
|VW-21775   |18   |
|PB-19210   |2    |
|RR-19315   |4    |
|EM-13960   |6    |
|MY-17380   |13   |
+-----------+-----+
only showing top 5 rows
+-----------+-------------+--------+-------------+-----------+--------------+-----------+-------+------------+
|customer_id|name         |segment |country      |city       |state         |postal_code|region |customer_key|
+-----------+-------------+--------+-------------+-----------+--------------+-----------+-------+------------+
|AA-10315   |Alex Avila   |Consumer|United States|Minneapolis|Minnesota     |55407      |Central|1           |
|AA-10375   |Allen Armold |Consumer|United States|Mesa       |Arizona       |85204      |West   |2           |
|AA-10480   |Andrew Allen |Consumer|United States|Concord    |North Carolina|28027      |South  |3           |
|AA-10645   |Anna Andreadi|Consumer|United States|Chester    |Pennsylvania  |19013      |East   |4           |
|AB-10015   |Aaron 

In [43]:
fact_sales = clean_sales.join(clean_customer,clean_customer.customer_id ==clean_sales.customer_id,how="left")\
                        .drop("customer_id","city","state","postal_code","region","name","segment","country")
displays(fact_sales)

fact_sales =fact_sales.join(clean_product,clean_product.product_id ==fact_sales.product_id,how="left")
displays(fact_sales)

facts_sales =fact_sales.drop("product_name","product_id","category","sub-category")
displays(facts_sales)

+------+----------+----------+--------------+---------------+--------+--------+--------+--------+------------+
|row_id|order_date|ship_date |ship_mode     |product_id     |sales   |quantity|discount|profit  |customer_key|
+------+----------+----------+--------------+---------------+--------+--------+--------+--------+------------+
|1     |11/8/2016 |11/11/2016|Second Class  |FUR-BO-10001798|261.96  |2       |0       |41.9136 |144         |
|2     |11/8/2016 |11/11/2016|Second Class  |FUR-CH-10000454|731.94  |3       |0       |219.582 |144         |
|3     |6/12/2016 |6/16/2016 |Second Class  |OFF-LA-10000240|14.62   |2       |0       |6.8714  |238         |
|4     |10/11/2015|10/18/2015|Standard Class|FUR-TA-10000577|957.5775|5       |0.45    |-383.031|706         |
|5     |10/11/2015|10/18/2015|Standard Class|OFF-ST-10000760|22.368  |2       |0.2     |2.5164  |706         |
+------+----------+----------+--------------+---------------+--------+--------+--------+--------+------------+
o

In [None]:
# rearranging our column tables and displaying the results

dim_customer = clean_customer
dim_product = clean_product
facts_sales =facts_sales

dim_customer =dim_customer.select("customer_key","customer_id","name","country","city","state","postal_code","region")
dim_product =dim_product.select("product_key","product_id","product_name","category","sub-category")
facts_sales =facts_sales.select("row_id","product_key","customer_key","order_date","ship_date","ship_mode","sales","quantity","discount","profit")

displays(dim_customer)
displays(dim_product)
displays(facts_sales)


+------------+-----------+-------------+-------------+-----------+--------------+-----------+-------+
|customer_key|customer_id|name         |country      |city       |state         |postal_code|region |
+------------+-----------+-------------+-------------+-----------+--------------+-----------+-------+
|1           |AA-10315   |Alex Avila   |United States|Minneapolis|Minnesota     |55407      |Central|
|2           |AA-10375   |Allen Armold |United States|Mesa       |Arizona       |85204      |West   |
|3           |AA-10480   |Andrew Allen |United States|Concord    |North Carolina|28027      |South  |
|4           |AA-10645   |Anna Andreadi|United States|Chester    |Pennsylvania  |19013      |East   |
|5           |AB-10015   |Aaron Bergman|United States|Seattle    |Washington    |98103      |West   |
+------------+-----------+-------------+-------------+-----------+--------------+-----------+-------+
only showing top 5 rows
+-----------+---------------+-----------------------------