In [None]:
#Load, Transform, Persist Pipeline

#1-mount the data lakes
#2-loads csvs from landing data lake
#3-convert csvs to parquet and move then to processing data lake
#4-create sql database
#5-create tables based on parquet format files
#6-specific analysis wil be moved to curated data lake and then loaded into sql tables
#7-powerbi application reads directly from sql tables at databricks rest api service


# Mounting Data lakes

In [None]:
dbutils.fs.unmount("/mnt/landing") 


In [None]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "036963a7-5381-48c4-beca-207f2199bbfb", 
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="olist_scope",key="olist-secret"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/1cc86953-2aef-440d-b5d5-2826e03c32c0/oauth2/token"}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://landing@oliststorageaccount2.dfs.core.windows.net/", 
  mount_point = "/mnt/landing",
  extra_configs = configs)


In [None]:
dbutils.fs.ls("/mnt/landing/")



In [None]:
#Example: error device already monted
dbutils.fs.unmount("/mnt/processing")


In [None]:
dbutils.fs.mount(
  source = "abfss://processing@oliststorageaccount2.dfs.core.windows.net/",
  mount_point = "/mnt/processing",
  extra_configs = configs)



In [None]:
dbutils.fs.ls("/mnt/processing")



In [None]:
dbutils.fs.unmount("/mnt/curated")



In [None]:
dbutils.fs.mount(
  source = "abfss://curated@oliststorageaccount2.dfs.core.windows.net/",
  mount_point = "/mnt/curated",
  extra_configs = configs)



In [None]:
dbutils.fs.ls("/mnt/curated")



# Readings CSVs in Landing Data Lake to DataFrames

In [None]:
#read customer csv to dataframe and test it
df_customers = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dbo.olist_customers_dataset.csv")

 
#display the dataframe
display(df_customers)

In [None]:
df_customers.printSchema()

In [None]:
#read the rest of csv files to the respectives dataframes

df_geolocation = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dbo.olist_geolocation_dataset.csv")
df_order_items = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dbo.olist_order_items_dataset.csv")
df_order_payments = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dbo.olist_order_payments_dataset.csv")
df_order_reviews = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dbo.olist_order_reviews_dataset.csv")
df_orders = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dbo.olist_orders_dataset.csv")
df_sellers = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dbo.olist_sellers_dataset.csv")
df_product_category_name_translation = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dbo.product_category_name_translation.csv")



# Create SQL Temp Views

In [None]:
df_customers.createOrReplaceTempView("customers_tempview")


In [None]:
%sql
SELECT *
FROM customers_tempview

Create SQL Database

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS customers_db

# Create SQL Tables

In [None]:
%sql
DROP TABLE IF EXISTS customers_db.customers

 -- geolocation
 -- order_items
 -- order_payments
 -- order_reviews
 -- orders
 -- sellers
 -- product_category_name_translation

In [None]:

%sql
DROP TABLE IF EXISTS customers_db.geolocation


In [None]:

%sql
DROP TABLE IF EXISTS customers_db.order_items



In [None]:

%sql
DROP TABLE IF EXISTS customers_db.order_payments



In [None]:

%sql
DROP TABLE IF EXISTS customers_db.order_reviews



In [None]:

%sql
DROP TABLE IF EXISTS customers_db.orders



In [None]:

%sql
DROP TABLE IF EXISTS customers_db.sellers



In [None]:

%sql
DROP TABLE IF EXISTS customers_db.product_category_name_translation


In [None]:
%sql
CREATE TABLE IF NOT EXISTS customers_db.customers 
USING CSV
LOCATION '/mnt/landing/dbo.olist_customers_dataset.csv'
OPTIONS (header "true", inferSchema "true")




In [None]:
%sql
SELECT COUNT(customer) FROM customers_db.customers

In [None]:
%sql
CREATE TABLE IF NOT EXISTS customers_db.geolocation 
USING CSV
LOCATION '/mnt/landing/dbo.olist_geolocation_dataset.csv'
OPTIONS (header "true", inferSchema "true")



In [None]:

%sql
CREATE TABLE IF NOT EXISTS customers_db.order_items 
USING CSV
LOCATION '/mnt/landing/dbo.olist_order_items_dataset.csv'
OPTIONS (header "true", inferSchema "true")



In [None]:

%sql
CREATE TABLE IF NOT EXISTS customers_db.order_payments 
USING CSV
LOCATION '/mnt/landing/dbo.olist_order_payments_dataset.csv'
OPTIONS (header "true", inferSchema "true")



In [None]:

%sql
CREATE TABLE IF NOT EXISTS customers_db.order_reviews 
USING CSV
LOCATION '/mnt/landing/dbo.olist_order_reviews_dataset.csv'
OPTIONS (header "true", inferSchema "true")


In [None]:

%sql
CREATE TABLE IF NOT EXISTS customers_db.orders 
USING CSV
LOCATION '/mnt/landing/dbo.olist_orders_dataset.csv'
OPTIONS (header "true", inferSchema "true")



In [None]:

%sql
CREATE TABLE IF NOT EXISTS customers_db.sellers 
USING CSV
LOCATION '/mnt/landing/dbo.olist_sellers_dataset.csv'
OPTIONS (header "true", inferSchema "true")



In [None]:

%sql
CREATE TABLE IF NOT EXISTS customers_db.product_category_name_translation 
USING CSV
LOCATION '/mnt/landing/dbo.product_category_name_translation.csv'
OPTIONS (header "true", inferSchema "true")



In [None]:
%sql
SELECT *
FROM customers_db.customers

In [None]:
%sql
DESCRIBE customers_db.customers

In [None]:
df_customers_SQL = spark.table('customers_db.customers')
display(df_customers_SQL)

# Filtering the DataSet

In [None]:
df_customers_SQL.select('customer_state').distinct().show()

In [None]:
from pyspark.sql.functions import col
df_customers_SQL = df_customers_SQL.filter(col("customer_state") == "RJ")

In [None]:
display(df_customers_SQL)

# Write Full Parquet Datasets to Processing Data lake

In [None]:
df_customers.write.mode("overwrite").parquet("/mnt/processing/customers.parquet")


In [None]:

df_geolocation.write.mode("overwrite").parquet("/mnt/processing/geolocation.parquet")
df_order_items.write.mode("overwrite").parquet("/mnt/processing/order_items.parquet")
df_order_payments.write.mode("overwrite").parquet("/mnt/processing/order_payments.parquet")
df_order_reviews.write.mode("overwrite").parquet("/mnt/processing/order_reviews.parquet")
df_orders.write.mode("overwrite").parquet("/mnt/processing/orders.parquet")
df_sellers.write.mode("overwrite").parquet("/mnt/processing/sellers.parquet")
df_product_category_name_translation.write.mode("overwrite").parquet("/mnt/processing/product_category_name_translation.parquet")


# Write Filtered Parquet to Processing Data Lake

In [None]:
df_customers_SQL.write.mode("overwrite").parquet("/mnt/processing/customers_RJ.parquet")


In [None]:
df_customers_parq = spark.read.parquet("/mnt/processing/customers_RJ.parquet")
display(df_customers_parq)

In [None]:
df_customers_parq.createOrReplaceTempView("CustomersParquetTable")
custparkSQL = spark.sql("select * from CustomersParquetTable where customer_state = 'RJ'")
display(custparkSQL)

# Create SQL Tables based on Parquet files at Processing Data Lake

In [None]:
%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS customers_db.customers_pqt USING PARQUET OPTIONS (path "/mnt/processing/customers.parquet", header "true", inferSchema "true")


In [None]:
%sql
SELECT * FROM customers_db.customers_pqt

In [None]:

%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS customers_db.geolocation_pqt USING PARQUET OPTIONS (path "/mnt/processing/geolocation.parquet", header "true", inferSchema "true")




In [None]:

%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS customers_db.order_items_pqt USING PARQUET OPTIONS (path "/mnt/processing/order_items.parquet", header "true", inferSchema "true")



In [None]:

%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS customers_db.order_payments_pqt USING PARQUET OPTIONS (path "/mnt/processing/order_payments.parquet", header "true", inferSchema "true")



In [None]:

%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS customers_db.order_reviews_pqt USING PARQUET OPTIONS (path "/mnt/processing/order_reviews.parquet", header "true", inferSchema "true")



In [None]:

%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS customers_db.orders_pqt USING PARQUET OPTIONS (path "/mnt/processing/orders.parquet", header "true", inferSchema "true")



In [None]:

%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS customers_db.sellers_pqt USING PARQUET OPTIONS (path "/mnt/processing/sellers.parquet", header "true", inferSchema "true")



In [None]:

%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS customers_db.product_category_name_translation_pqt USING PARQUET OPTIONS (path "/mnt/processing/product_category_name_translation.parquet", header "true", inferSchema "true")



In [None]:
%sql
-- Filtered parquet
CREATE TABLE IF NOT EXISTS customers_db.customers_RJ_pqt USING PARQUET OPTIONS (path "/mnt/processing/customers_RJ.parquet", header "true", inferSchema "true")



In [None]:
%sql
REFRESH TABLE customers_db.customers_RJ_pqt

In [None]:
%sql
SELECT * from customers_db.customers_RJ_pqt

In [None]:
df_customers_parq = spark.read.parquet("/mnt/processing/customers_RJ.parquet")
df_customers_parq.createOrReplaceTempView("CustomersParquetTableByState")
df_customers_by_state_parq = spark.sql("select * from CustomersParquetTableByState where customer_state='RJ'")
display(df_customers_by_state_parq)

In [None]:
display(df_customers_parq)

# Write processed CSVs to Curated Data Lake

In [None]:
df_customers_parq.write.option("header",True).option("delimiter",",").mode("overwrite").csv("/mnt/curated/customers_RJ.csv")

# Test Reading CSV file located at Curated Data Lake

In [None]:
#read in the data to dataframe df
df_RJ = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/curated/customers_RJ.csv")
 
#display the dataframe
display(df_RJ)

In [None]:
%sql
-- Filtered Curated CSV
CREATE TABLE IF NOT EXISTS customers_db.customers_RJ_csv 
USING CSV
LOCATION '/mnt/curated/customers_RJ.csv'
OPTIONS (header "true", inferSchema "true")


In [None]:
%sql
REFRESH TABLE customers_db.customers_RJ_csv

In [None]:
%sql
Select * from customers_db.customers_RJ_csv 