This notebook focuses on extracting data from a MongoDB NoSQL collection and CSV files stored in the Data Lake. It performs necessary data cleaning and joins the datasets to produce a unified final output table.

In [0]:
import pandas as pd

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_date, datediff, current_date


In [0]:
# Getting the access to csv file from the Azure Data Lake through Service Principle
storage_account = ""
application_id = ""
directory_id = ""
# service_credential = dbutils.secrets.get(scope="<scope>",key="<service-credential-key>")

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", "")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

In [0]:
# Reading all the csv files
base_path = "abfss://olistdata@olistecommadls.dfs.core.windows.net/bronze/"
orders_path = base_path + "olist_orders_dataset.csv"
customers_path = base_path + "olist_customers_dataset.csv"
geolocation_path = base_path + "olist_geolocation_dataset.csv"
items_path = base_path + "olist_order_items_dataset.csv"
payments_path = base_path + "olist_order_payments_dataset"
products_path = base_path + "olist_products_dataset.csv"
sellers_path = base_path + "olist_sellers_dataset.csv"
order_review_path = base_path + "olist_order_reviews_dataset.csv"   
# product_translation_path = base_path + "product_category_name_translation.csv"

customer_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(customers_path)
geolocation_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(geolocation_path)
orders_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(orders_path)
items_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(items_path)   
payments_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(payments_path)
products_df  = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(products_path)
sellers_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(sellers_path)
order_review_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(order_review_path)



In [0]:
import pymongo

Getting NoSQL data directly from the MongoDB database

In [0]:
# importing module
from pymongo import MongoClient

hostname = "kpb69.h.filess.io"
database = "olistDataNoSQL_readyplain"
port = "27018"
username = "olistDataNoSQL_readyplain"
password = "fb90d98a69dea27adc3be65ab015df5f64a080c7"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]


In [0]:
collection = mydatabase["product_categories"]
mongo_data = pd.DataFrame(list(collection.find()))

In [0]:
mongo_data.drop('_id', axis=1, inplace=True)

In [0]:
mongo_spark_df = spark.createDataFrame(mongo_data)

##Cleaning data

In [0]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [0]:
orders_df.count()

99441

In [0]:
display(orders_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in orders_df.columns]))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,0,0,0,160,1783,2965,0


In [0]:
#Removing nulls and dropping duplicates
def clean_data(df, name):
    df_cleaned = df.na.drop(how ="all")
    return df_cleaned.dropDuplicates()

In [0]:
orders_clean_df = clean_data(orders_df, "orders_df")
customer_clean_df = clean_data(customer_df, "customer_df")
geolocation_clean_df = clean_data(geolocation_df, "geolocation_df")
items_clean_df = clean_data(items_df, "items_df")
payments_clean_df = clean_data(payments_df, "payments_df")
products_clean_df = clean_data(products_df, "products_df")
sellers_clean_df = clean_data(sellers_df, "sellers_df")
order_review_df = clean_data(order_review_df, "order_review_df")


# display(orders_clean_df.select([F.sum(F.col(c).

In [0]:
display(orders_clean_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in orders_df.columns]))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,0,0,0,160,1783,2965,0


In [0]:
# Convert Date time stamp to Date Column

orders_clean_df = orders_clean_df.withColumn("order_purchase_date", to_date(col("order_purchase_timestamp"))).withColumn("order_approved_date", to_date(col("order_approved_at"))).withColumn("order_delivered_carrier_date", to_date(col("order_delivered_carrier_date"))).withColumn("order_delivered_customer_date", to_date(col("order_delivered_customer_date"))).withColumn("order_estimated_delivery_date", to_date(col("order_estimated_delivery_date")))

In [0]:
# Calculate Delivery and delay times

orders_clean_df = orders_clean_df.withColumn("actual_delivery_time", datediff(col("order_delivered_customer_date"), col("order_purchase_date"))).withColumn("estimated_delivery_time", datediff(col("order_estimated_delivery_date"), col("order_purchase_date")))
orders_clean_df = orders_clean_df.withColumn("delay time", F.when(col("actual_delivery_time") > F.col("estimated_delivery_time"), col("actual_delivery_time") - col("estimated_delivery_time")).otherwise(0))

In [0]:
# display(orders_clean_df)

In [0]:
# Joining
orders_customer_df = orders_clean_df.join(customer_clean_df, orders_clean_df.customer_id == customer_clean_df.customer_id, "left")
orders_payments_df = orders_customer_df.join(payments_clean_df, orders_customer_df.order_id == payments_clean_df.order_id, "left")
orders_items_df = orders_payments_df.join(items_clean_df, "order_id", "left")
orders_products_df = orders_items_df.join(products_clean_df, orders_items_df.product_id == products_clean_df.product_id, "left")
final_df = orders_products_df.join(sellers_clean_df, orders_items_df.seller_id == sellers_clean_df.seller_id, "left")



In [0]:
final_df = final_df.join(mongo_spark_df, "product_category_name", "left")
display(final_df)

product_category_name,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,order_purchase_date,order_approved_date,actual_delivery_time,estimated_delivery_time,delay time,customer_id.1,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,order_id.1,payment_sequential,payment_type,payment_installments,payment_value,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,product_id.1,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,seller_id.1,seller_zip_code_prefix,seller_city,seller_state,product_category_name_english
bebes,acce194856392f074dbf9dada14d8d82,7e20bf5ca92da68200643bda76c504c6,delivered,2018-06-04T00:00:13Z,2018-06-05T00:35:10Z,2018-06-05,2018-06-16,2018-07-18,2018-06-04,2018-06-05,12.0,44,0,7e20bf5ca92da68200643bda76c504c6,576ea0cab426cd8a00fad9a9c90a4494,41213,salvador,BA,acce194856392f074dbf9dada14d8d82,1,credit_card,10,227.679993,2.0,9451e630d725c4bb7a5a206b48b99486,d673a59aac7a70d8b01e6902bf090a11,2018-06-13T00:35:10Z,39.5,48.64,9451e630d725c4bb7a5a206b48b99486,52.0,300.0,1.0,350.0,31.0,10.0,12.0,d673a59aac7a70d8b01e6902bf090a11,14940.0,ibitinga,SP,baby
bebes,acce194856392f074dbf9dada14d8d82,7e20bf5ca92da68200643bda76c504c6,delivered,2018-06-04T00:00:13Z,2018-06-05T00:35:10Z,2018-06-05,2018-06-16,2018-07-18,2018-06-04,2018-06-05,12.0,44,0,7e20bf5ca92da68200643bda76c504c6,576ea0cab426cd8a00fad9a9c90a4494,41213,salvador,BA,acce194856392f074dbf9dada14d8d82,1,credit_card,10,227.679993,1.0,d70f38e7f79c630f8ea00c993897042c,977f9f63dd360c2a32ece2f93ad6d306,2018-06-13T00:35:10Z,90.9,48.64,d70f38e7f79c630f8ea00c993897042c,53.0,233.0,1.0,10950.0,41.0,40.0,40.0,977f9f63dd360c2a32ece2f93ad6d306,14910.0,tabatinga,SP,baby
cama_mesa_banho,1d067305b599c1e0dceb3864056ea527,0489975a325480c9e385e9f135bb13c3,delivered,2018-02-14T13:05:17Z,2018-02-14T13:15:38Z,2018-02-20,2018-03-09,2018-03-09,2018-02-14,2018-02-14,23.0,23,0,0489975a325480c9e385e9f135bb13c3,b577af9a54b023b4bcc51f69bb937f1c,14406,franca,SP,1d067305b599c1e0dceb3864056ea527,1,credit_card,5,170.429993,1.0,2a2d22ae30e026f1893083c8405ca522,1a3df491d1c4f1589fc2b934ada68bf2,2018-02-22T12:15:38Z,148.9,21.53,2a2d22ae30e026f1893083c8405ca522,60.0,744.0,2.0,8350.0,50.0,27.0,38.0,1a3df491d1c4f1589fc2b934ada68bf2,89224.0,joinville,SC,bed_bath_table
perfumaria,6942b8da583c2f9957e990d028607019,52006a9383bf149a4fb24226b173106f,shipped,2018-01-10T11:33:07Z,2018-01-11T02:32:30Z,2018-01-11,,2018-02-07,2018-01-10,2018-01-11,,28,0,52006a9383bf149a4fb24226b173106f,528b011eb7fab3d59c336cc7248eed3a,38600,paracatu,MG,6942b8da583c2f9957e990d028607019,1,boleto,1,69.1200027,1.0,ee0c1cf2fbeae95205b4aa506f1469f0,cc419e0650a3c5ba77189a1882b7556a,2018-01-18T02:32:30Z,53.99,15.13,ee0c1cf2fbeae95205b4aa506f1469f0,44.0,334.0,1.0,200.0,16.0,16.0,13.0,cc419e0650a3c5ba77189a1882b7556a,9015.0,santo andre,SP,perfumery
,6f841dde94727854eaff3f66432c80ea,a9c9532060c9d245f06526c633d2dfba,delivered,2018-01-02T19:20:35Z,2018-01-02T19:32:22Z,2018-01-03,2018-01-27,2018-02-05,2018-01-02,2018-01-02,25.0,34,0,a9c9532060c9d245f06526c633d2dfba,5f38cc36e507c4d880489be38964af19,27262,volta redonda,RJ,6f841dde94727854eaff3f66432c80ea,1,credit_card,3,192.949997,1.0,c600d7f13104e8db2ca2b9fa78581409,f18112da765e0971ca5d510dec322bdd,2018-01-08T19:32:22Z,168.9,24.05,c600d7f13104e8db2ca2b9fa78581409,,,,2400.0,42.0,15.0,27.0,f18112da765e0971ca5d510dec322bdd,88351.0,brusque,SC,
beleza_saude,ca290a06ee0945b956f79c93b5191633,acd575d7382968889f41a5a3f37510dd,delivered,2018-04-09T22:02:30Z,2018-04-09T23:10:27Z,2018-04-10,2018-04-25,2018-05-10,2018-04-09,2018-04-09,16.0,31,0,acd575d7382968889f41a5a3f37510dd,323c4e9d78f774df9255793377ce636e,77645,lajeado,TO,ca290a06ee0945b956f79c93b5191633,1,credit_card,1,175.100006,1.0,2948658cb6abc82847412be7201bfc4c,955fee9216a65b617aa5c0531780ce60,2018-04-15T23:10:27Z,120.0,55.1,2948658cb6abc82847412be7201bfc4c,59.0,1030.0,5.0,3500.0,36.0,25.0,21.0,955fee9216a65b617aa5c0531780ce60,4782.0,sao paulo,SP,health_beauty
beleza_saude,3735720b8c1cd4212fa0866a8e58a049,94af59d9cac1ae1976312584f628ef6f,delivered,2018-05-16T23:08:51Z,2018-05-16T23:55:11Z,2018-05-17,2018-05-21,2018-06-20,2018-05-16,2018-05-16,5.0,35,0,94af59d9cac1ae1976312584f628ef6f,c64199b55f928b9302b63c4f9d841fcb,22250,rio de janeiro,RJ,3735720b8c1cd4212fa0866a8e58a049,1,credit_card,8,209.110001,1.0,de80c37c338574091896b7fdc81dc376,5b85809efd0d0e4dea1a9544e1280ed9,2018-05-22T23:55:11Z,189.9,19.21,de80c37c338574091896b7fdc81dc376,49.0,1045.0,1.0,333.0,17.0,17.0,15.0,5b85809efd0d0e4dea1a9544e1280ed9,19020.0,presidente prudente,SP,health_beauty
informatica_acessorios,8b346abc34a6e64bc67fcd3b0eccdc9f,dd1506d329d9b0135855082aae3c79ac,delivered,2018-07-11T20:15:04Z,2018-07-11T20:26:13Z,2018-07-12,2018-07-26,2018-08-09,2018-07-11,2018-07-11,15.0,29,0,dd1506d329d9b0135855082aae3c79ac,22abb23a37bf693f3f66fecb4232c136,60340,fortaleza,CE,8b346abc34a6e64bc67fcd3b0eccdc9f,1,credit_card,7,376.450012,1.0,c75d5edf6cf5302523d622c43a072be0,17e34d8224d27a541263c4c64b11a56b,2018-07-23T20:26:13Z,337.18,39.27,c75d5edf6cf5302523d622c43a072be0,54.0,779.0,2.0,150.0,16.0,19.0,16.0,17e34d8224d27a541263c4c64b11a56b,14085.0,riberao preto,SP,computers_accessories
ferramentas_jardim,a4d7c8bca45b56444e3c59ddcca7d7c9,fcdb673d0f8d2b84d3862193f17a08f9,delivered,2017-08-29T12:36:36Z,2017-08-30T02:05:44Z,2017-09-04,2017-09-08,2017-09-22,2017-08-29,2017-08-30,10.0,24,0,fcdb673d0f8d2b84d3862193f17a08f9,3443aa1df045d475ffb79648c36494e7,2469,sao paulo,SP,a4d7c8bca45b56444e3c59ddcca7d7c9,1,boleto,1,73.3399963,1.0,389d119b48cf3043d311335e499d9c6b,1f50f920176fa81dab994f9023523100,2017-09-05T02:05:44Z,59.9,13.44,389d119b48cf3043d311335e499d9c6b,59.0,341.0,2.0,1750.0,37.0,22.0,40.0,1f50f920176fa81dab994f9023523100,15025.0,sao jose do rio preto,SP,garden_tools
moveis_decoracao,b64b1539563ff515922bd124fd838863,a22203b8cd7210764aa43f442904a076,delivered,2017-05-06T11:48:53Z,2017-05-06T12:47:06Z,2017-05-08,2017-05-24,2017-06-21,2017-05-06,2017-05-06,18.0,46,0,a22203b8cd7210764aa43f442904a076,a30be234246fed38afe084a19a6cc015,60115,fortaleza,CE,b64b1539563ff515922bd124fd838863,1,credit_card,1,47.7400017,1.0,16ba44d9231d19fc57e0a4188d0ce1cf,cca3071e3e9bb7d12640c9fbe2301306,2017-05-11T12:33:39Z,22.9,24.84,16ba44d9231d19fc57e0a4188d0ce1cf,50.0,488.0,1.0,303.0,25.0,2.0,17.0,cca3071e3e9bb7d12640c9fbe2301306,14940.0,ibitinga,SP,furniture_decor


Databricks visualization. Run in Databricks to view.

In [0]:
final_cleaned_df = final_df.drop(*[col for col in final_df.columns if "__" in col])
# display(final_cleaned_df)

In [0]:
def remove_duplicate_cols(df):
    columns = df.columns
    seen_cols = set()
    cols_to_drop = []
    for col in columns:
        if col in seen_cols:
            cols_to_drop.append(col)
        else:
            seen_cols.add(col)
    return df.drop(*cols_to_drop)
final_cleaned_df = remove_duplicate_cols(final_cleaned_df)
# display(final_cleaned_df)

In [0]:
final_cleaned_df.write.mode("overwrite").parquet("abfss://olistdata@olistecommadls.dfs.core.windows.net/silver/")