In [None]:
spark

In [None]:
storage_account = "*********"
application_id = "***********************"
directory_id = "************************"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", "************************")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

In [None]:
customer_df = spark.read.\
    format("csv")\
    .option("header", "true")\
    .load("abfss://olistdata@*********.dfs.core.windows.net/Bronze/olist_customers_dataset.csv")

display(customer_df)

In [None]:
base_path = "abfss://olistdata@*********.dfs.core.windows.net/Bronze/"
orders_path = base_path + "olist_orders_dataset.csv"
payments_path = base_path + "olist_order_payments_dataset.csv"
items_path = base_path + "olist_order_items_dataset.csv"
reviews_path = base_path + "olist_order_reviews_dataset.csv"
customers_path = base_path + "olist_customers_dataset.csv"
sellers_path = base_path + "olist_sellers_dataset.csv"
# geolocation_path = base_path + "olist_geolocation_dataset.csv"
products_path = base_path + "olist_products_dataset.csv"

In [None]:
orders_df = spark.read.format("csv").option("header",True).load(orders_path)
payments_df = spark.read.format("csv").option("header",True).load(payments_path)
items_df = spark.read.format("csv").option("header",True).load(items_path)
reviews_df = spark.read.format("csv").option("header",True).load(reviews_path)
customers_df = spark.read.format("csv").option("header",True).load(customers_path)
sellers_df = spark.read.format("csv").option("header",True).load(sellers_path)
geolocation_df = spark.read.format("csv").option("header",True).load(geolocation_path)
products_df = spark.read.format("csv").option("header",True).load(products_path)

## Reading Data from Pymongo

In [None]:
# importing module
from pymongo import MongoClient
import pandas as pd

hostname = "*********************"
database = "************************"
port = "*****"
username = "************************"
password = "************************"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]
mydatabase

In [None]:
collection = mydatabase["product_categories"]

mongo_data = pd.DataFrame(list(collection.find()))

In [None]:
display(products_df)

In [None]:
mongo_data

### Cleaning the data

In [None]:
from pyspark.sql.functions import col, to_date, datediff, current_date

In [None]:
def clean_datafram(df, name):
    print("Cleaning "+name)
    return df.dropDuplicates().na.drop('all')

orders_df = clean_datafram(orders_df, "Orders")
display(orders_df)

In [None]:
# Convert Date Columns

orders_df = orders_df.withColumn("order_purchase_timestamp", to_date(col("order_purchase_timestamp")))\
.withColumn("order_delivered_customer_date", to_date(col("order_delivered_customer_date")))\
    .withColumn("order_estimated_delivery_date", to_date(col("order_estimated_delivery_date")))


In [None]:
# Calculate Delivery and Time Delays
from pyspark.sql.functions import when

orders_df = orders_df.withColumn("actual_delivery_time", datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp")))
orders_df = orders_df.withColumn("estimated_delivery_time", datediff(col("order_estimated_delivery_date"), col("order_purchase_timestamp")))
orders_df = orders_df.withColumn("Delay Time", col("actual_delivery_time") - col("estimated_delivery_time"))

display(orders_df)

### Joining

In [None]:
# 1. Join orders and customers
orders_customers_df = orders_df.join(customers_df, "customer_id", "left")

# 2. Join with payments (this fixes the Ambiguous Reference error)
orders_payments_df = orders_customers_df.join(payments_df, "order_id", "left")

# 3. Join with items
orders_items_df = orders_payments_df.join(items_df, "order_id", "left")

# 4. Join with products
orders_items_products_df = orders_items_df.join(products_df, "product_id", "left")

# 5. Join with sellers to get the final result
final_df = orders_items_products_df.join(sellers_df, "seller_id", "left")

In [None]:
display(final_df)

In [None]:
# Convert MongoDB ObjectId to string for Spark compatibility
mongo_data['_id'] = mongo_data['_id'].astype(str)
mongo_spark_df = spark.createDataFrame(mongo_data)

In [None]:
final_df = final_df.join(mongo_spark_df, "product_category_name", "left")

In [None]:
display(final_df)

In [None]:
final_df.write.mode("overwrite").parquet("abfss://olistdata@*********.dfs.core.windows.net/Silver")