In [1]:
from pyspark.sql import SparkSession


In [2]:
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder \
.appName("Data Cleaning") \
.master("local[*]") \
.getOrCreate()

25/07/09 04:01:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
df = spark.read.csv("/tmp/brazilian-ecommerce/olist_customers_dataset.csv", header=True, inferSchema=True)

                                                                                

In [9]:
# delete null rows
df_cleaned = df.dropna()

In [10]:
# replace null value by another value
replace_value = 10
df = df.fillna({"customer_id" : replace_value})

In [12]:
# handle duplicate values
df = df.dropDuplicates()
df = df.dropDuplicates(["customer_id", "customer_city"])

In [13]:
# change data type
df = df.withColumn("customer_zip_code_prefix", col("customer_zip_code_prefix").cast("float"))

In [15]:
# filter invalid data
df = df.filter((col("customer_zip_code_prefix") > 5000.0) & (col("customer_zip_code_prefix") < 15000.0))

In [16]:
# handle disunion data
df = df.withColumn("customer_city", upper(col("customer_city")))

In [19]:
# handle outliers
Q1, Q3 = df.approxQuantile("customer_zip_code_prefix", [0.25, 0.75], 0.01)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR

df = df.filter((col("customer_zip_code_prefix") >= lower) & (col("customer_zip_code_prefix") <= upper))

                                                                                

In [20]:
# rename col
df = df.withColumnRenamed("customer_city", "city")

In [4]:
df2 = spark.read.csv("/tmp/brazilian-ecommerce/olist_order_items_dataset.csv", header = True, inferSchema = True)

                                                                                

In [5]:
# change string to timestap
df2 = df2.withColumn("shipping_limit_date", to_timestamp("shipping_limit_date", "yyyy-MM-dd HH:mm:ss"))

In [6]:
df2.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [8]:
# handle date and time
df2 = df2.withColumn("year", year("shipping_limit_date")) \
       .withColumn("month", month("shipping_limit_date")) \
       .withColumn("day", dayofmonth("shipping_limit_date")) \
       .withColumn("hour", hour("shipping_limit_date"))


In [9]:
df2.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+----+-----+---+----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|year|month|day|hour|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+----+-----+---+----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|2017|    9| 19|   9|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|2017|    5|  3|  11|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|2018|    1| 18|  14|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|2018|    8| 15|  10|
|00042b26cf59d7ce6..

In [10]:
spark.stop()