
###  Configure the connection between Azure Data Lake Storage  (ADLS) and Databricks

In [0]:
storage_account = "olistdatastorageaccountd"
application_id = "615d8082-bfcb-4fd8-8f69-e1089ea2a457"
directory_id = "39bafab8-b8b9-4cfc-8316-5116be5db767"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", "SDg8Q~986FB3IdCu4bDdTnfifTFQjr7TCttbpbvJ")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")


### Validate the connection by reading a file from ADLS Bronze folder using spark

In [0]:
## df = spark.read.csv(f"abfss://container@datalakestorage.dfs.core.windows.net/Bronze/ olist_customers_dataset.csv", header=True, inferSchema=True)

customer_df = spark.read.csv(f"abfss://olistdata@olistdatastorageaccountd.dfs.core.windows.net/Bronze/olist_customers_dataset.csv", header=True, inferSchema=True)
display(customer_df)


### Read Bronze layer Raw data into Spark datasets

In [0]:
base_path = "abfss://olistdata@olistdatastorageaccountd.dfs.core.windows.net/Bronze/"

orders_path = base_path + "olist_orders_dataset.csv"
payments_path = base_path + "olist_order_payments.csv"
reviews_path = base_path + "olist_order_reviews_dataset.csv"
items_path = base_path + "olist_order_items_dataset.csv"
customers_path = base_path + "olist_customers_dataset.csv"
sellers_path = base_path + "olist_sellers_dataset.csv"
geolocation_path = base_path + "olist_geolocation_dataset.csv"
products_path = base_path + "olist_products_dataset.csv"

orders_df = spark.read.csv(orders_path, header=True, inferSchema=True)
payments_df = spark.read.csv(payments_path, header=True, inferSchema=True)  
reviews_df = spark.read.csv(reviews_path, header=True, inferSchema=True)
items_df = spark.read.csv(items_path, header=True, inferSchema=True)
customers_df = spark.read.csv(customers_path, header=True, inferSchema=True)
sellers_df = spark.read.csv(sellers_path, header=True, inferSchema=True)
geolocation_df = spark.read.csv(geolocation_path, header=True, inferSchema=True)
products_df = spark.read.csv(products_path, header=True, inferSchema=True)


### Validate the mongo DB connection

In [0]:
# importing module
from pymongo import MongoClient

hostname = "q9g8j.h.filess.io"
database = "olistDataNoSQL_thencheese"
port = "61004"
username = "olistDataNoSQL_thencheese"
password = "916bc2473e798b3db3f94215c2b576b27b57fb3b"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]
display(mydatabase)



### Read Mongo DB data into spark dataset

In [0]:
import pandas as pd

collection = mydatabase['product_categories']
mongo_data = pd.DataFrame(list(collection.find()))

mongo_data

In [0]:
# drop the _id column before converting to Spark dataset
 
mongo_data.drop("_id",axis=1,inplace=True)
mongo_spark_df = spark.createDataFrame(mongo_data)

### Cleaning Data
#### Basic cleaning like 
####### 1. Removing Dups
####### 2. Converting data types
####### 3. Adding new aggregated column derived from other columns

In [0]:
# import basic functions for the cleaning and transformation
from pyspark.sql.functions import col, to_date, datediff, current_date

In [0]:
# Function to remove the duplicates in the datasets
def clean_dataFrame(df,name):
    print("Cleaning:"+name)
    return df.dropDuplicates().na.drop()

orders_df=clean_dataFrame(orders_df,"orders")
display(orders_df)


In [0]:
#convert Date Columns from timestamp datatype to date datatype. 

orders_df = orders_df.withColumn("order_purchase_timestamp", to_date("order_purchase_timestamp"))\
                     .withColumn("order_delivered_customer_date", to_date("order_delivered_customer_date"))\
                     .withColumn("order_estimated_delivery_date", to_date("order_estimated_delivery_date"))

display(orders_df)                  

In [0]:
# Calculate delivery & Time delays

orders_df = orders_df.withColumn("actual_delivery_time", datediff(("order_delivered_customer_date"), ("order_purchase_timestamp")))

orders_df = orders_df.withColumn("estimated_delivery_time", datediff(("order_estimated_delivery_date"), ("order_purchase_timestamp")))

orders_df = orders_df.withColumn("delay Time", (col("actual_delivery_time") - col("estimated_delivery_time")))

display(orders_df)

### Joining multiple dataframes to form a single dataframe



In [0]:
# Joining the Raw datasets from Bronze folder post Data Cleansing

orders_customers_df = orders_df.join(customers_df, orders_df.customer_id == customers_df.customer_id, "left")

orders_payments_df = orders_customers_df.join(payments_df, orders_customers_df.order_id == payments_df.order_id, "left")

orders_items_df = orders_payments_df.join(items_df,"order_id", "left")

orders_items_products_df = orders_items_df.join(products_df, orders_items_df.product_id == products_df.product_id, "left")

final_df = orders_items_products_df.join(sellers_df, orders_items_products_df.seller_id == sellers_df.seller_id, "left")



In [0]:
display(final_df)

In [0]:
final_df.columns

In [0]:
# Joining the final dataframe with the Mongodb dataframe

final_df = final_df.join(mongo_spark_df,"product_category_name", "left")

In [0]:
display(final_df)

In [0]:
final_df.columns