## Load All Dataset

In [1]:
from pyspark.sql import SparkSession
from CustomLib.Config import postgres_config
from pyspark.sql.functions import col, when, lit, expr, unix_timestamp, to_timestamp

In [2]:
# Initialize Spark Session
sparkMaster = 'local[*]'
sparkAppName = 'Py-Brick_Brazilian_Olist_Dataset'
spark = SparkSession \
    .builder \
    .master(sparkMaster) \
    .appName(sparkAppName) \
    .config("spark.jars", "./jdbc/postgresql-42.6.0.jar") \
    .config("spark.jars.packages","io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [3]:
def LoadAllDataset():
    dataset_folderpath = "D:/00 Project/00 My Project/Dataset/Brazilian_Olist/"
    options = {
        "inferSchema": "true",
        "delimiter": ",",
        "header": "true"
    }
    df_order_items = spark.read.options(**options).csv(dataset_folderpath +  "1_olist_order_items_dataset.csv")
    df_order_payments = spark.read.options(**options).csv(dataset_folderpath + "2_olist_order_payments_dataset.csv")
    df_orders = spark.read.options(**options).csv(dataset_folderpath + "3_olist_orders_dataset.csv")
    df_products = spark.read.options(**options).csv(dataset_folderpath + "4_olist_products_dataset.csv")
    df_sellers = spark.read.options(**options).csv(dataset_folderpath + "5_olist_sellers_dataset.csv")
    return df_order_items, df_order_payments, df_orders, df_products, df_sellers

df_order_items, df_order_payments, df_orders, df_products, df_sellers = LoadAllDataset()

In [4]:
print(df_order_items.printSchema)
print("Lenght: ", df_order_items.count())

<bound method DataFrame.printSchema of DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]>
Lenght:  112650


In [5]:
print(df_order_payments.printSchema)
print("Lenght: ", df_order_payments.count())

<bound method DataFrame.printSchema of DataFrame[order_id: string, payment_sequential: int, payment_type: string, payment_installments: int, payment_value: double]>
Lenght:  103886


In [6]:
print(df_orders.printSchema)
print("Lenght: ", df_orders.count())

<bound method DataFrame.printSchema of DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp]>
Lenght:  99441


In [7]:
print(df_products.printSchema)
print("Lenght: ", df_products.count())

<bound method DataFrame.printSchema of DataFrame[product_id: string, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int]>
Lenght:  32951


In [8]:
print(df_sellers.printSchema)
print("Lenght: ", df_sellers.count())

<bound method DataFrame.printSchema of DataFrame[seller_id: string, seller_zip_code_prefix: int, seller_city: string, seller_state: string]>
Lenght:  3095


## Save into Data Lake

In [11]:
df_order_items.filter(df_order_items.price < 100).write.format("delta").mode("overwrite").save("./delta_lake/spark_order_items")
# df_order_payments.write.format("delta").mode("overwrite").save("./delta_lake/spark_order_payments")
# df_orders.write.format("delta").mode("overwrite").save("./delta_lake/spark_orders")
# df_products.write.format("delta").mode("overwrite").save("./delta_lake/spark_products")
# df_sellers.write.format("delta").mode("overwrite").save("./delta_lake/spark_sellers")

## Read from Data Lake

In [12]:
df_order_items_2 = spark.read.format("delta").load("./delta_lake/spark_order_items")
print("Lenght: ", df_order_items_2.count())
print(df_order_items_2.show(3))

Lenght:  72165
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|cfea2c141cacb975b...|            1|7d018f8e4f93213ba...|3d871de0142ce09b7...|2017-11-17 15:50:36| 19.9|        25.63|
|cfea8d218fdcd01ec...|            1|73326828aa5efe1ba...|d13e50eaa47b4cbe9...|2018-08-21 02:10:08| 81.9|         8.73|
|cfead345fd3958937...|            1|2e830c73f28d3b854...|8b321bb669392f516...|2018-04-19 12:55:44| 11.6|         7.39|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
only showing top 3 rows

None


## Insert Data into PostgreSQL

In [121]:
def add_data_postgres(df, table_name):
    # conn_string = f'postgresql://{postgres_config["username"]}:{postgres_config["password"]}@{postgres_config["hostname"]}:{postgres_config["port"]}/{postgres_config["database"]}'
    
    database_url = f'jdbc:postgresql://{postgres_config["hostname"]}:{postgres_config["port"]}/{postgres_config["database"]}'
    table_name = f'"{postgres_config["schema"]}".{table_name}'
    properties = {
        "user": postgres_config["username"],
        "password": postgres_config["password"],
        "driver": "org.postgresql.Driver"
    }
    df.write \
    .jdbc(url=database_url, table=table_name, mode="overwrite", properties=properties)

add_data_postgres(df_order_items, "spark_order_items")
add_data_postgres(df_order_payments, "spark_order_payments")
add_data_postgres(df_orders, "spark_orders")
add_data_postgres(df_products, "spark_products")
add_data_postgres(df_sellers, "spark_sellers")

## Read Data from PostgreSQL

In [55]:
database_url = f'jdbc:postgresql://{postgres_config["hostname"]}:{postgres_config["port"]}/{postgres_config["database"]}'

df_orders = spark.read.format("jdbc").option("url", database_url) \
    .option("driver", "org.postgresql.Driver").option("dbtable", f'"{postgres_config["schema"]}".spark_orders') \
    .option("user", postgres_config["username"]).option("password", postgres_config["password"]).load()
df_order_payments = spark.read.format("jdbc").option("url", database_url) \
    .option("driver", "org.postgresql.Driver").option("dbtable", f'"{postgres_config["schema"]}".spark_order_payments') \
    .option("user", postgres_config["username"]).option("password", postgres_config["password"]).load()
df_order_items = spark.read.format("jdbc").option("url", database_url) \
    .option("driver", "org.postgresql.Driver").option("dbtable", f'"{postgres_config["schema"]}".spark_order_items') \
    .option("user", postgres_config["username"]).option("password", postgres_config["password"]).load()
df_products = spark.read.format("jdbc").option("url", database_url) \
    .option("driver", "org.postgresql.Driver").option("dbtable", f'"{postgres_config["schema"]}".spark_products') \
    .option("user", postgres_config["username"]).option("password", postgres_config["password"]).load()
df_sellers = spark.read.format("jdbc").option("url", database_url) \
    .option("driver", "org.postgresql.Driver").option("dbtable", f'"{postgres_config["schema"]}".spark_sellers') \
    .option("user", postgres_config["username"]).option("password", postgres_config["password"]).load()

df_orders.createOrReplaceTempView("spark_orders")
df_order_payments.createOrReplaceTempView("spark_order_payments")
df_order_items.createOrReplaceTempView("spark_order_items")
df_products.createOrReplaceTempView("spark_products")
df_sellers.createOrReplaceTempView("spark_sellers")

In [14]:
df_orders.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|d9eabc69f974b3d08...|d4be0795e8fa7ea79...|   delivered|     2018-03-21 19:32:50|2018-03-21 19:48:04|         2018-03-22 19:48:46|          2018-03-28 18:56:52|          2018-04-13 00:00:00|
|c9fbf88f9c58364e0...|7ec5b53960d508118...|   delivered|     2018-02-05 16:47:14|2018-02-05 17:16:09|         2018-02-07 15:04:45|          2018-02-20 19:59:00|          2018-03-12 00:00:00|
|fd01a48a7d75383a3...|014f2d069b53eec84...|  

In [15]:
df_order_payments.show(5)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|e558bf8bb8fddae94...|                 1| credit_card|                   1|       371.13|
|2095778434755b568...|                 1| credit_card|                   2|        55.11|
|5c1995c020e0a3b2b...|                 1| credit_card|                   1|        165.8|
|ab459ccd2e24ef6d6...|                 1|      boleto|                   1|       188.62|
|9b3d13c2644e10e20...|                 1| credit_card|                   5|        65.71|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 5 rows



In [16]:
df_order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|45193dbb3e96a6b68...|            2|a52c53c58fd2105ad...|3b15288545f8928d3...|2017-10-02 16:15:08|106.99|        17.25|
|4519ce49b67354e83...|            1|e09134e776e503444...|6fd52c528dcb38be2...|2018-05-02 02:15:20|129.89|         9.26|
|4519d054994236d26...|            1|e7a7e6b2b4959a39a...|2e90cb1677d35cfe2...|2017-04-18 02:45:21| 18.16|        14.52|
|4519e07ee266cdb76...|            1|9a29b754b7fc0aa8e...|0bf0150d5b9d60d9c...|2017-11-30 17:20:33| 106.0|        20.23|
|4519eb4d4f3192015...|            1|fde71f25e699ca0a2...|bc2ac6b95e1accce9...|2017-08-25 19:25:17|  61.0|         9.34|
+--------------------+-------------+----

In [17]:
df_products.show(5)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [18]:
df_sellers.show(5)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



## Data Manipulation

In [19]:
## Get Settlement Status
df_settlement_status = spark.sql("""
SELECT 
	so.customer_id,
	finall.*
FROM (
	SELECT *,
		CASE
		     WHEN pr.payment_value = pr.cost
		          THEN 'settled'
		     WHEN pr.payment_value < pr.cost
		          THEN 'underpaid'
		     WHEN pr.payment_value > pr.cost
		          THEN 'overpaid'
		 END payment_status
	FROM (
		SELECT 
			sop.order_id, 
			sop.payment_type,
			sop.payment_sequential,
			ROUND(CAST(sop.payment_value as NUMERIC), 0) AS payment_value,
			soi.last_order_item_id,
			soi.price, 
			soi.freight_value,
			ROUND(CAST(cost as NUMERIC), 0) AS cost
		FROM (
			SELECT 
				order_id, 
				MAX(payment_type) AS payment_type, 
				MAX(payment_sequential) AS payment_sequential,
				SUM(payment_value) AS payment_value
			FROM spark_order_payments
			GROUP BY order_id
		) sop
		INNER JOIN (
			SELECT
				order_id, MAX(order_item_id) AS last_order_item_id, 
				SUM(price) AS price, 
				SUM(freight_value) AS freight_value, 
				SUM(price) + SUM(freight_value) AS cost
			FROM spark_order_items
			GROUP BY order_id
		) soi
		ON sop.order_id = soi.order_id
	) pr
) finall
INNER JOIN spark_orders so
ON so.order_id = finall.order_id
WHERE 
	payment_status != 'settled' AND 
	payment_status = 'underpaid'
ORDER BY order_id, payment_sequential
""")

df_settlement_status.show(5)

+--------------------+--------------------+------------+------------------+-------------+------------------+-----+-------------+----+--------------+
|         customer_id|            order_id|payment_type|payment_sequential|payment_value|last_order_item_id|price|freight_value|cost|payment_status|
+--------------------+--------------------+------------+------------------+-------------+------------------+-----+-------------+----+--------------+
|e13a5ed086aacc9b2...|051fcda88d997d3ff...| credit_card|                 1|           52|                 1| 49.0|          7.6|  57|     underpaid|
|198b639c2da0e7f71...|0e556f5eafbf3eb39...| credit_card|                 1|           82|                 2| 49.8|        45.86|  96|     underpaid|
|2a69da30b726b7f3d...|262118ce178bb3e45...| credit_card|                 1|          126|                 2|119.8|        57.94| 178|     underpaid|
|02ae654741ad89536...|320dcf288de15e4ff...| credit_card|                 1|           81|                 

In [20]:
## Get Most Sold Products
df_most_sold_product = spark.sql("""
SELECT *,
	DENSE_RANK () OVER (
		ORDER BY sold_number DESC
	) sold_rank_perworld,
	DENSE_RANK () OVER (
		PARTITION BY seller_id
		ORDER BY sold_number DESC
	) sold_rank_perseller
	FROM (
	SELECT si.product_id, si.seller_id, spr.product_category_name, sse.seller_city, COUNT(si.product_id) AS sold_number
	FROM spark_order_items si
	INNER JOIN spark_products spr
	ON si.product_id = spr.product_id
	INNER JOIN spark_sellers sse
	ON si.seller_id = sse.seller_id
	GROUP BY si.product_id, si.seller_id, product_category_name, seller_city
	ORDER BY sold_number DESC
) sold_prod
ORDER BY sold_number DESC, sold_rank_perworld ASC
""")

df_most_sold_product.show(5)

+--------------------+--------------------+---------------------+--------------------+-----------+------------------+-------------------+
|          product_id|           seller_id|product_category_name|         seller_city|sold_number|sold_rank_perworld|sold_rank_perseller|
+--------------------+--------------------+---------------------+--------------------+-----------+------------------+-------------------+
|aca2eb7d00ea1a7b8...|955fee9216a65b617...|     moveis_decoracao|           sao paulo|        527|                 1|                  1|
|422879e10f4668299...|1f50f920176fa81da...|   ferramentas_jardim|sao jose do rio p...|        484|                 2|                  1|
|99a4788cb24856965...|4a3ca9315b744ce9f...|      cama_mesa_banho|            ibitinga|        482|                 3|                  1|
|389d119b48cf3043d...|1f50f920176fa81da...|   ferramentas_jardim|sao jose do rio p...|        392|                 4|                  2|
|368c6c730842d7801...|1f50f920176f

In [21]:
df_delayed_shipping = spark.sql("""
SELECT 
	cost_tbl.*,
	customer_id,
	order_status,
	order_delivered_carrier_date
FROM (
	SELECT
		order_id,
		seller_id,
		shipping_limit_date,
		SUM(price) AS price, 
		SUM(freight_value) AS freight_value, 
		ROUND(CAST(SUM(price) + SUM(freight_value) AS NUMERIC), 0) AS cost
	FROM spark_order_items
	GROUP BY order_id, seller_id, shipping_limit_date
) cost_tbl
INNER JOIN spark_orders so
ON so.order_id = cost_tbl.order_id
WHERE so.order_delivered_carrier_date IS NOT NULL
""")

df_delayed_shipping.unpersist()
df_delayed_shipping = df_delayed_shipping \
    .withColumn("delayed_duration_s", unix_timestamp("order_delivered_carrier_date") - unix_timestamp("shipping_limit_date")) \
    .withColumn("delayed_status", expr("CASE WHEN delayed_duration_s > 0 THEN 'Delayed' ELSE 'On Time' END"))
    # .withColumn("order_delivered_carrier_date", to_timestamp(col("order_delivered_carrier_date"))) \
    # .withColumn("shipping_limit_date", to_timestamp(col("shipping_limit_date"))) \
df_delayed_shipping = df_delayed_shipping.filter(df_delayed_shipping.delayed_duration_s > 0)
df_delayed_shipping.show(5)

+--------------------+--------------------+-------------------+-----+-------------+----+--------------------+------------+----------------------------+------------------+--------------+
|            order_id|           seller_id|shipping_limit_date|price|freight_value|cost|         customer_id|order_status|order_delivered_carrier_date|delayed_duration_s|delayed_status|
+--------------------+--------------------+-------------------+-----+-------------+----+--------------------+------------+----------------------------+------------------+--------------+
|00018f77f2f0320c5...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93| 260|f6dd3ec061db4e398...|   delivered|         2017-05-04 14:35:00|             98987|       Delayed|
|0094bd07f49fed902...|5670f4db5b62c43d5...|2018-05-09 04:12:53|11.53|         7.39|  19|de0c1a4d8c367c58d...|   delivered|         2018-05-10 19:19:00|            140767|       Delayed|
|00995d799817ecc3b...|620c87c171fb2a6dd...|2018-08-01 11:45:13| 49.9| 

In [22]:
df_most_loyal_cust = spark.sql("""
SELECT
	customer_id,
	COUNT(order_id) AS count_transaction,
	ROUND(CAST(SUM(payment_value) as NUMERIC), 0) AS transactions_amt
FROM (
	SELECT 
		so.order_id,
		so.customer_id,
		sop.payment_sequential,
		sop.payment_type,
		sop.payment_installments,
		sop.payment_value
	FROM (
		SELECT *
		FROM spark_orders
		WHERE order_status != 'unavailable' AND order_status != 'canceled'
	) so
	INNER JOIN spark_order_payments sop
	ON sop.order_id = so.order_id
	ORDER BY sop.order_id
	) trx
GROUP BY customer_id, order_id
ORDER BY count_transaction DESC, transactions_amt DESC
""")
df_most_loyal_cust.show(5)

+--------------------+-----------------+----------------+
|         customer_id|count_transaction|transactions_amt|
+--------------------+-----------------+----------------+
|9af2372a1e4934027...|               29|             458|
|92cd3ec6e2d643d4e...|               26|              63|
|b246eeed30b362c09...|               22|              41|
|270c23a11d024a44c...|               21|             161|
|13aa59158da63ba0e...|               19|             206|
+--------------------+-----------------+----------------+
only showing top 5 rows



In [59]:
df_order_items = df_order_items \
    .withColumn("freight_efficiency", col("freight_value") / col("price")) \
    .withColumn("freight_category",
      when((col("freight_efficiency") >= 0) & (col("freight_efficiency") <= 10), "A") \
                .when((col("freight_efficiency") >= 11) & (col("freight_efficiency") <= 20), "B"))
df_order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+-------------------+----------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value| freight_efficiency|freight_category|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+-------------------+----------------+
|45193dbb3e96a6b68...|            2|a52c53c58fd2105ad...|3b15288545f8928d3...|2017-10-02 16:15:08|106.99|        17.25| 0.1612300214973362|               A|
|4519ce49b67354e83...|            1|e09134e776e503444...|6fd52c528dcb38be2...|2018-05-02 02:15:20|129.89|         9.26|0.07129109246285319|               A|
|4519d054994236d26...|            1|e7a7e6b2b4959a39a...|2e90cb1677d35cfe2...|2017-04-18 02:45:21| 18.16|        14.52| 0.7995594713656388|               A|
|4519e07ee266cdb76...|            1|9a29b754b7fc0aa8e...|0