In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Clean Raw Data") \
    .getOrCreate()

In [2]:
raw_df = spark.read.option("header", True).csv("hdfs://localhost:9000/user/hadup/ecommerce/raw/raw_data.csv")
raw_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)
 |-- order_item_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- freight_value: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: string (nullable = true)
 |-- product_descri

In [3]:
raw_df.show(5)

25/07/17 09:33:42 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+--------------------+------------------------+-------------+--------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------------+------------+--------------------+-------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|  cu

In [5]:
from pyspark.sql.functions import col

columns_needed = [
    "order_id",
    "order_status",
    "order_purchase_timestamp",
    "product_id",
    "product_category_name",
    "order_item_id",
    "price",
    "freight_value",
    "seller_id"
]

df_sales = raw_df.select([col(c) for c in columns_needed])

In [6]:
df_sales.printSchema()
df_sales.show(5, truncate=False)

root
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- order_item_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- freight_value: string (nullable = true)
 |-- seller_id: string (nullable = true)

+--------------------------------+------------+------------------------+--------------------------------+---------------------+-------------+-----+-------------+--------------------------------+
|order_id                        |order_status|order_purchase_timestamp|product_id                      |product_category_name|order_item_id|price|freight_value|seller_id                       |
+--------------------------------+------------+------------------------+--------------------------------+---------------------+-------------+-----+-------------+--------------------------------+
|e481f

In [7]:
from pyspark.sql.types import FloatType, IntegerType, TimestampType

df_sales = df_sales \
    .withColumn("order_purchase_timestamp", col("order_purchase_timestamp").cast(TimestampType())) \
    .withColumn("order_item_id", col("order_item_id").cast(IntegerType())) \
    .withColumn("price", col("price").cast(FloatType())) \
    .withColumn("freight_value", col("freight_value").cast(FloatType()))

In [8]:
df_sales.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- price: float (nullable = true)
 |-- freight_value: float (nullable = true)
 |-- seller_id: string (nullable = true)



In [9]:
df_sales = df_sales.dropna(subset=["order_id", "product_id", "price", "order_purchase_timestamp"])
df_sales = df_sales.filter((col("price") >= 0) & (col("freight_value") >= 0))
df_sales = df_sales.dropDuplicates(["order_id", "product_id", "order_item_id"])

In [11]:
df_sales.write.mode("overwrite").csv("hdfs://localhost:9000/user/hadup/ecommerce/clean/csv", header=True)
df_sales.write.mode("overwrite").parquet("hdfs://localhost:9000/user/hadup/ecommerce/clean/parquet")

                                                                                

In [None]:
df_sales.write.mode("overwrite").parquet("file:///home/hadup/ecommerce_clean_local/parquet")

In [None]:
spark.stop()