## Configuring Databricks to connect with ADLS Gen 2 storage

In [0]:
# Variables for configuration
storage_account = "ecomdatastorageaccount"
application_id = "45c44a3d-747a-4d28-8bf9-a71324d89b68"
directory_id = "f8265e58-2ac1-4ddb-ac4a-7fa5a1215a64"
service_credential = ".Kp8Q~4HQReCI1Zm1uUrDNYVLqzjzLAkhAeL4ay9"

# Code to connect databricks to adls gen 2
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

In [0]:
# df = spark.read.\
#     format("csv").\
#         option("header", "true").\
#             option("inferSchema","true").\
#                 load("abfss://olistdata@ecomdatastorageaccount.dfs.core.windows.net/Bronze/olist_customers_dataset.csv")

# display(df)

## Reading the data present in adls gen 2 bronze layer

In [0]:
base_path = "abfss://olistdata@ecomdatastorageaccount.dfs.core.windows.net/Bronze/"

customer_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_path + "olist_customers_dataset.csv")
orders_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_path + "olist_orders_dataset.csv")
products_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_path + "olist_products_dataset.csv")
sellers_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_path + "olist_sellers_dataset.csv")
order_items_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_path + "olist_order_items_dataset.csv")
order_payments_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_path + "olist_order_payments_dataset.csv")
order_reviews_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_path + "olist_order_reviews_dataset.csv")
geolocation_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(base_path + "olist_geolocation_dataset.csv")

# display(customer_df)
# display(orders_df)
# display(products_df)
# display(sellers_df)
# display(order_items_df)
# display(order_payments_df)
# display(order_reviews_df)
# display(geolocation_df)

## Connecting databricks with mongodb

In [0]:
# importing module
from pymongo import MongoClient

hostname = "k3pcq.h.filess.io"
database = "ecomNoSQLDB_newbillbit"
port = "27018"
username = "ecomNoSQLDB_newbillbit"
password = "fdfe458a17d57fb8eeeafa728131973a77a6d45c"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]

mydatabase


Database(MongoClient(host=['k3pcq.h.filess.io:27018'], document_class=dict, tz_aware=False, connect=True), 'ecomNoSQLDB_newbillbit')

In [0]:
#checking data sent from mongo db
import pandas as pd

collection = mydatabase["product_categories"]  # Choose a suitable name for your collection

mongo_data = pd.DataFrame(list(collection.find()))

mongo_data.head()

Unnamed: 0,_id,product_category_name,product_category_name_english
0,6824265264d4286032e325a4,beleza_saude,health_beauty
1,6824265264d4286032e325a5,informatica_acessorios,computers_accessories
2,6824265264d4286032e325a6,automotivo,auto
3,6824265264d4286032e325a7,cama_mesa_banho,bed_bath_table
4,6824265264d4286032e325a8,moveis_decoracao,furniture_decor


In [0]:
#Converting Pandas to Sparkdf
#mongo_spark_df = spark.createDataFrame(mongo_data)
#but if we directly run above then error will be there as "_id" field is there which is not acceptable in spark df
#dropping _id field in pandas
mongo_data = mongo_data.drop("_id", axis=1) if 'mongo_data' in locals() else pd.DataFrame()

In [0]:
#Converting Pandas to Sparkdf
spark_mongo_df = spark.createDataFrame(mongo_data)

# display(spark_mongo_df)

# Data Cleaning

In [0]:
#Remove duplicate rows from the DataFrame.
#.na.drop('all')->
'''
Removes rows where all columns are null (None/NaN).
drop('all') -> 'all' means the row will only be removed if every column is null.
If even one column has a value, the row will stay.
'''

import pyspark.sql.types as T 
import pyspark.sql.functions as F

def cleaning_duplicates_na(df, name):
    print("Cleaning " + name)
    return df.dropDuplicates().na.drop('all')



In [0]:
#customer_df

#to show the number of rows and cols in customer_df
num_rows = customer_df.count()
num_cols = len(customer_df.columns)
display(f"Number of rows: {num_rows}, Number of columns: {num_cols}")

customer_df = cleaning_duplicates_na(customer_df,"customer dataframe")
customer_df.printSchema()
customer_df.show()

#to show the number of rows and cols in customer_df
num_rows = customer_df.count()
num_cols = len(customer_df.columns)
display(f"Number of rows: {num_rows}, Number of columns: {num_cols}")

'Number of rows: 99441, Number of columns: 5'

Cleaning customer dataframe
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|5e7aa73120cbc53fa...|cd8738980c3332339...|                   23810|             itaguai|            RJ|
|1fda81133d4f067f7...|31973584858337356...|                   30170|      belo horizonte|            MG|
|ba4ec83bdd2a861c3...|8479ef9838c9bca6f...|                   15460|                icem|            SP|
|e8a2dab1e442d28c1...|43fcee98a4442ab2f...|                   66093|               belem|      

'Number of rows: 99441, Number of columns: 5'

In [0]:
#orders_df
#changing the data type of columns
#we won't be dropping duplicates here as orders can be repeated
orders_df.printSchema()

#to show the number of rows and cols in customer_df
num_rows = orders_df.count()
num_cols = len(orders_df.columns)
display(f"Number of rows: {num_rows}, Number of columns: {num_cols}")

#converting column datatypes
orders_df = orders_df.withColumn('order_purchase_timestamp', F.to_date('order_purchase_timestamp')).withColumn\
    ('order_delivered_carrier_date', F.to_date('order_delivered_carrier_date')).withColumn\
        ('order_approved_at',F.to_date('order_approved_at')).withColumn\
            ('order_delivered_customer_date',F.to_date('order_delivered_customer_date')).withColumn\
                ('order_estimated_delivery_date',F.to_date('order_estimated_delivery_date'))

orders_df.printSchema()

#adding a column which contains only the year and month
orders_df = orders_df.withColumn("order_year_month", F.date_format(F.col("order_purchase_timestamp"), format="y-M"))

#adding a column where delivery and time delays are calculated
orders_df = orders_df.withColumn("actual_delivery_time", F.datediff("order_delivered_customer_date", "order_purchase_timestamp"))
orders_df = orders_df.withColumn("estimated_delivery_time", F.datediff("order_estimated_delivery_date", "order_purchase_timestamp"))
orders_df =orders_df.withColumn("Delay Time", F.col("actual_delivery_time") - F.col("estimated_delivery_time"))

orders_df.printSchema()
orders_df.show()
# display(orders_df)

#to show the number of rows and cols in customer_df
num_rows = orders_df.count()
num_cols = len(orders_df.columns)
display(f"Number of rows: {num_rows}, Number of columns: {num_cols}")

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



'Number of rows: 99441, Number of columns: 8'

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_approved_at: date (nullable = true)
 |-- order_delivered_carrier_date: date (nullable = true)
 |-- order_delivered_customer_date: date (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_approved_at: date (nullable = true)
 |-- order_delivered_carrier_date: date (nullable = true)
 |-- order_delivered_customer_date: date (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)
 |-- order_year_month: string (nullable = true)
 |-- actual_delivery_time: integer (nullable = true)
 |-- estimated_delivery_time: integer (nullable = true)
 |-- Delay Time: integer

'Number of rows: 99441, Number of columns: 12'

In [0]:
#order_payments_df

#to show the number of rows and cols in customer_df
num_rows = order_payments_df.count()
num_cols = len(order_payments_df.columns)
display(f"Number of rows: {num_rows}, Number of columns: {num_cols}")


order_payments_df = cleaning_duplicates_na(order_payments_df,"payments for orders")
order_payments_df.printSchema()
order_payments_df.show()


#to show the number of rows and cols in customer_df
num_rows = order_payments_df.count()
num_cols = len(order_payments_df.columns)
display(f"Number of rows: {num_rows}, Number of columns: {num_cols}")

'Number of rows: 103886, Number of columns: 5'

Cleaning payments for orders
root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|ab85e3c92c828eb80...|                 1| credit_card|                  10|   357.970001|
|e95a9cf427d600b90...|                 1| credit_card|                   4|   81.4700012|
|7aad0379bac4fe5dc...|                 1|      boleto|                   1|   51.0099983|
|049446a8732fca98a...|                 1|      boleto|                   1|   66.0100021|
|1475f5ec3a992c94a...|                 1|     voucher|                   1|   63.4199982|
|7d7a6cf2359

'Number of rows: 103886, Number of columns: 5'

# Data Joining

![](https://i.imgur.com/HRhd2Y0.png)

In [0]:
orders_cutomers_df = orders_df.join(customer_df, orders_df.customer_id == customer_df.customer_id,"left")

orders_cutomers_payments_df = orders_cutomers_df.join(order_payments_df, orders_cutomers_df.order_id == order_payments_df.order_id,"left")

orders_cutomers_payments_items_df = orders_cutomers_payments_df.join(order_items_df,"order_id","left")

orders_cutomers_payments_items_products_df = orders_cutomers_payments_items_df.join(products_df, orders_cutomers_payments_items_df.product_id == products_df.product_id,"left")

final_df = orders_cutomers_payments_items_products_df.join(sellers_df, orders_cutomers_payments_items_products_df.seller_id == sellers_df.seller_id,"left")

# Combining Combined df with the Mongo Db data

In [0]:
finaldf = final_df.join(spark_mongo_df, final_df.product_category_name == spark_mongo_df.product_category_name_english,"left")

In [0]:
#Dropping duplicate columns if there are any
def remove_duplicate_columns(df):
    columns=df.columns
    unique_cols = set()
    columns_to_drop = []

    for col in columns:
        if col in unique_cols:
            columns_to_drop.append(col)
        else:
            unique_cols.add(col)

    df_cleaned=df.drop(*columns_to_drop)

    return df_cleaned

final_cleaned_df = remove_duplicate_columns(final_df)

In [0]:
display(final_cleaned_df)

order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,order_year_month,actual_delivery_time,estimated_delivery_time,Delay Time,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,payment_sequential,payment_type,payment_installments,payment_value,order_item_id,shipping_limit_date,price,freight_value,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,seller_zip_code_prefix,seller_city,seller_state
delivered,2017-10-02,2017-10-02,2017-10-04,2017-10-10,2017-10-18,2017-10,8.0,16,-8.0,7c396fd4830fd04220f754e42b4e5bff,3149,sao paulo,SP,2,voucher,1,18.5900002,1.0,2017-10-06T11:07:15Z,29.99,8.72,utilidades_domesticas,40.0,268.0,4.0,500.0,19.0,8.0,13.0,9350.0,maua,SP
delivered,2018-07-24,2018-07-26,2018-07-26,2018-08-07,2018-08-13,2018-7,14.0,20,-6.0,af07308b275d755c9edb36a90c618231,47813,barreiras,BA,1,boleto,1,141.460007,1.0,2018-07-30T03:24:27Z,118.7,22.76,perfumaria,29.0,178.0,1.0,400.0,19.0,13.0,19.0,31570.0,belo horizonte,SP
delivered,2018-08-08,2018-08-08,2018-08-08,2018-08-17,2018-09-04,2018-8,9.0,27,-18.0,3a653a41f6f9fc3d2a113cf8398680e8,75265,vianopolis,GO,1,credit_card,3,179.119995,1.0,2018-08-13T08:55:23Z,159.9,19.22,automotivo,46.0,232.0,1.0,420.0,24.0,19.0,21.0,14840.0,guariba,SP
delivered,2017-11-18,2017-11-18,2017-11-22,2017-12-02,2017-12-15,2017-11,14.0,27,-13.0,7c142cf63193a1473d2e66489a9ae977,59296,sao goncalo do amarante,RN,1,credit_card,1,72.1999969,1.0,2017-11-23T19:45:59Z,45.0,27.2,pet_shop,59.0,468.0,3.0,450.0,30.0,10.0,20.0,31842.0,belo horizonte,MG
delivered,2018-02-13,2018-02-13,2018-02-14,2018-02-16,2018-02-26,2018-2,3.0,13,-10.0,72632f0f9dd73dfee390c9b22eb56dd6,9195,santo andre,SP,1,credit_card,1,28.6200008,1.0,2018-02-19T20:31:37Z,19.9,8.72,papelaria,38.0,316.0,4.0,250.0,51.0,15.0,15.0,8752.0,mogi das cruzes,SP
delivered,2017-07-09,2017-07-09,2017-07-11,2017-07-26,2017-08-01,2017-7,17.0,23,-6.0,80bb27c7c16e8f973207a5086ab329e2,86320,congonhinhas,PR,1,credit_card,6,175.259995,1.0,2017-07-13T22:10:13Z,147.9,27.36,automotivo,49.0,608.0,1.0,7150.0,65.0,10.0,65.0,7112.0,guarulhos,SP
invoiced,2017-04-11,2017-04-13,,,2017-05-09,2017-4,,28,,36edbb3fb164b1f16485364b6fb04c73,98900,santa rosa,RS,1,credit_card,1,65.9499969,1.0,2017-04-19T13:25:17Z,49.9,16.05,,,,,600.0,35.0,35.0,15.0,5455.0,sao paulo,SP
delivered,2017-05-16,2017-05-16,2017-05-22,2017-05-26,2017-06-07,2017-5,10.0,22,-12.0,932afa1e708222e5821dac9cd5db4cae,26525,nilopolis,RJ,1,credit_card,3,75.1600037,1.0,2017-05-22T13:22:11Z,59.99,15.17,automotivo,59.0,956.0,1.0,50.0,16.0,16.0,17.0,12940.0,atibaia,SP
delivered,2017-01-23,2017-01-25,2017-01-26,2017-02-02,2017-03-06,2017-1,10.0,42,-32.0,39382392765b6dc74812866ee5ee92a7,99655,faxinalzinho,RS,1,boleto,1,35.9500008,1.0,2017-01-27T18:29:09Z,19.9,16.05,moveis_decoracao,41.0,432.0,2.0,300.0,35.0,35.0,15.0,13720.0,sao jose do rio pardo,SP
delivered,2017-07-29,2017-07-29,2017-08-10,2017-08-16,2017-08-23,2017-7,18.0,25,-7.0,299905e3934e9e181bfb2e164dd4b4f8,18075,sorocaba,SP,2,voucher,1,161.419998,1.0,2017-08-11T12:05:32Z,149.99,19.77,moveis_escritorio,45.0,527.0,1.0,9750.0,42.0,41.0,42.0,8577.0,itaquaquecetuba,SP


Databricks visualization. Run in Databricks to view.

# Storing Data

In [0]:
final_cleaned_df.write.mode("overwrite").parquet("abfss://olistdata@ecomdatastorageaccount.dfs.core.windows.net/Silver/")