In [0]:
spark.conf.set("fs.azure.account.auth.type.oliststgacc.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.oliststgacc.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.oliststgacc.dfs.core.windows.net", "f1ed4ff5-c8f7-4a5b-83cf-3fb45ed5573e")
spark.conf.set("fs.azure.account.oauth2.client.secret.oliststgacc.dfs.core.windows.net", "xpW8Q~5o0FWZarPXnG7wxgTdKNbvANHp6O.~8cnN")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.oliststgacc.dfs.core.windows.net", "https://login.microsoftonline.com/ed570910-d325-4362-a77a-1441a44f0cf3/oauth2/token")

Loading Data

In [0]:
base_path = "abfss://rawdata@oliststgacc.dfs.core.windows.net/Olist/"
order_payments = base_path + "`olist_order_payments`.csv"
customers = base_path + "olist_customers_dataset.csv"
geolocation = base_path + "olist_geolocation_dataset.csv"
order_items = base_path + "olist_order_items_dataset.csv"
order_reviews = base_path + "olist_order_reviews_dataset.csv"
orders = base_path + "olist_orders_dataset.csv"
products = base_path + "olist_products_dataset.csv"
sellers = base_path + "olist_sellers_dataset.csv"

In [0]:
order_payments_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(order_payments)
customers_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(customers)
geolocation_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(geolocation)
order_items_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(order_items)
order_reviews_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(order_reviews)
orders_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(orders)
products_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(products)
sellers_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(sellers)

Now we wil use pymongo to connect with MongoDB Server

In [0]:
import pymongo

In [0]:
from pymongo import MongoClient

In [0]:
# importing module
from pymongo import MongoClient

hostname = "hx14l.h.filess.io"
database = "olistnosqldb_previousat"
port = "27018"
username = "olistnosqldb_previousat"
password = "a057bbdff41bc00ff29eddb57943d02f697dc61b"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]
mydatabase

Database(MongoClient(host=['hx14l.h.filess.io:27018'], document_class=dict, tz_aware=False, connect=True), 'olistnosqldb_previousat')

In [0]:
import pandas as pd
collection = mydatabase['product_category_name']
product_category_df = pd.DataFrame(list(collection.find()))

In [0]:
product_category_df.dtypes

_id                              object
product_category_name            object
product_category_name_english    object
dtype: object

In [0]:
product_category_df["_id"] = product_category_df["_id"].astype(str)

In [0]:
# The data was loaded as Pandas dataframe but we are converting it into Spark dataframe
product_category_sdf = spark.createDataFrame(product_category_df)

Removing Null Values

In [0]:
from pyspark.sql.functions import col,to_date,datediff,current_date

In [0]:
def clean_dataframe(df,name):
    print("Cleaning dataframe: {name}")
    return df.dropDuplicates().na.drop('all')

In [0]:
dataframes = {
    "order_payments_df": order_payments_df,
    "customers_df": customers_df,
    "geolocation_df": geolocation_df,
    "order_items_df": order_items_df,
    "order_reviews_df": order_reviews_df,
    "orders_df": orders_df,
    "products_df": products_df,
    "sellers_df": sellers_df,
    "product_category_sdf": product_category_sdf 
}

In [0]:
from pyspark.sql.functions import col, sum

for table_name, df in dataframes.items():
    print(f"\nNull counts for table: {table_name}")
    
    null_counts = df.select([
        sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
    ])
    null_counts.show()


Null counts for table: order_payments_df
+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+


Null counts for table: customers_df
+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+


Null counts for table: geolocation_df
+---------------------------+---------------+----------

Following tables have null values:
1) order_reviews_df
2) orders_df
3) products_df

In [0]:
dataframes_with_null = {
    "products_df": products_df,
    "order_reviews_df": order_reviews_df,
    "orders_df": orders_df,    
}

# Remove rows with any null values from each DataFrame
for name, df in dataframes_with_null.items():
    print(f"Removing nulls from: {name}")
    cleaned_df = df.dropna(how='any')  # drops rows with any nulls
    dataframes_with_null[name] = cleaned_df

Removing nulls from: products_df
Removing nulls from: order_reviews_df
Removing nulls from: orders_df


In [0]:
products_df = dataframes_with_null["products_df"]
order_reviews_df = dataframes_with_null["order_reviews_df"]
orders_df = dataframes_with_null["orders_df"]

Converting Date Columns into Timestamp Format for all the tables

![Screenshot 2025-06-27 232221.png](./Screenshot 2025-06-27 232221.png "Screenshot 2025-06-27 232221.png")

order_reviews_df tables has data columns with string datatypes we will convert it into timestamp

In [0]:
order_reviews_df.display()

review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
8670d52e15e00043ae7de4c01cc2fe06,b9bf720beb4ab3728760088589c62129,4,recomendo,aparelho eficiente. no site a marca do aparelho esta impresso como 3desinfector e ao chegar esta com outro nome...atualizar com a marca correta uma vez que é o mesmo aparelho,2018-05-22 00:00:00,2018-05-23 16:45:47
3948b09f7c818e2d86c9a546758b2335,e51478e7e277a83743b6f9991dbfa3fb,5,Super recomendo,"Vendedor confiável, produto ok e entrega antes do prazo.",2018-05-23 00:00:00,2018-05-24 03:00:01
373cbeecea8286a2b66c97b1b157ec46,583174fbe37d3d5f0d6661be3aad1786,1,Não chegou meu produto,Péssimo,2018-08-15 00:00:00,2018-08-15 04:10:37
d21bbc789670eab777d27372ab9094cc,4fc44d78867142c627497b60a7e0228a,5,Ótimo,Loja nota 10,2018-07-10 00:00:00,2018-07-11 14:10:25
c92cdd7dd544a01aa35137f901669cdf,37e7875cdce5a9e5b3a692971f370151,4,Muito bom.,"Recebi exatamente o que esperava. As demais encomendas de outros vendedores atrasaram, mas esta chegou no prazo.",2018-06-07 00:00:00,2018-06-09 18:44:02
08c9d79ec0eba1d252e3f52f14b8e6a9,e029f708df3cc108b3264558771605c6,5,Bom,"Recomendo ,",2018-06-13 00:00:00,2018-06-13 22:54:44
b193ff3c9f32a01f3a0d9ae26b94d244,e2e6ee1ed2d7f2f36b05d234983bd7a0,5,Maravilhoso!,"Tô completamente apaixonada, loja super responsável e confiável!",2018-08-10 00:00:00,2018-08-11 00:22:13
86c5cfa7fcbde303f704b60a78ced7d6,a6456e781cb962cc3f412b04de4fed7b,5,Entrega perfeita,Muito bom. muito cheiroso.,2018-06-13 00:00:00,2018-06-14 17:29:03
500c05500aa275953129f49799ee5c73,8a9424899aac432d80d8e580932b5ee9,5,MT lindo,MT lindo,2018-07-25 00:00:00,2018-07-25 21:37:22
109b5ce2dd11bb8460eff3b86da6fefc,25362fbf6aac4b01a28dee1e076acc26,5,Ótimo Produto!,Recomendo o vendedor...,2018-08-17 00:00:00,2018-08-17 21:47:08


In [0]:
from pyspark.sql.functions import to_timestamp, to_date, date_format 


order_reviews_df = order_reviews_df.withColumn("review_creation_date", to_date(col("review_creation_date")))\
    .withColumn("review_answer_timestamp", to_timestamp(col("review_answer_timestamp")))

In [0]:
order_reviews_df.display()

review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
8670d52e15e00043ae7de4c01cc2fe06,b9bf720beb4ab3728760088589c62129,4,recomendo,aparelho eficiente. no site a marca do aparelho esta impresso como 3desinfector e ao chegar esta com outro nome...atualizar com a marca correta uma vez que é o mesmo aparelho,2018-05-22,2018-05-23T16:45:47Z
3948b09f7c818e2d86c9a546758b2335,e51478e7e277a83743b6f9991dbfa3fb,5,Super recomendo,"Vendedor confiável, produto ok e entrega antes do prazo.",2018-05-23,2018-05-24T03:00:01Z
373cbeecea8286a2b66c97b1b157ec46,583174fbe37d3d5f0d6661be3aad1786,1,Não chegou meu produto,Péssimo,2018-08-15,2018-08-15T04:10:37Z
d21bbc789670eab777d27372ab9094cc,4fc44d78867142c627497b60a7e0228a,5,Ótimo,Loja nota 10,2018-07-10,2018-07-11T14:10:25Z
c92cdd7dd544a01aa35137f901669cdf,37e7875cdce5a9e5b3a692971f370151,4,Muito bom.,"Recebi exatamente o que esperava. As demais encomendas de outros vendedores atrasaram, mas esta chegou no prazo.",2018-06-07,2018-06-09T18:44:02Z
08c9d79ec0eba1d252e3f52f14b8e6a9,e029f708df3cc108b3264558771605c6,5,Bom,"Recomendo ,",2018-06-13,2018-06-13T22:54:44Z
b193ff3c9f32a01f3a0d9ae26b94d244,e2e6ee1ed2d7f2f36b05d234983bd7a0,5,Maravilhoso!,"Tô completamente apaixonada, loja super responsável e confiável!",2018-08-10,2018-08-11T00:22:13Z
86c5cfa7fcbde303f704b60a78ced7d6,a6456e781cb962cc3f412b04de4fed7b,5,Entrega perfeita,Muito bom. muito cheiroso.,2018-06-13,2018-06-14T17:29:03Z
500c05500aa275953129f49799ee5c73,8a9424899aac432d80d8e580932b5ee9,5,MT lindo,MT lindo,2018-07-25,2018-07-25T21:37:22Z
109b5ce2dd11bb8460eff3b86da6fefc,25362fbf6aac4b01a28dee1e076acc26,5,Ótimo Produto!,Recomendo o vendedor...,2018-08-17,2018-08-17T21:47:08Z


Data Cleaning

![HRhd2Y0.png](./HRhd2Y0.png "HRhd2Y0.png")

Reference: Kaggle Code - https://www.kaggle.com/code/muhammadarism/data-understanding-cleaning-olist-dataset#Data-Cleaning

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Row

def skimming_data(sdf):
    """
    Generate a summary DataFrame of data characteristics for a Spark DataFrame.
    """
    summary_data = []

    total_rows = sdf.count()
    duplicate_count = total_rows - sdf.dropDuplicates().count()

    for col in sdf.columns:
        data_type = [f.dataType for f in sdf.schema.fields if f.name == col][0]

        null_pct = (sdf.filter(F.col(col).isNull()).count() / total_rows) * 100

        if isinstance(
            data_type,
            (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.ShortType, T.DecimalType)
        ):
            neg_pct = (sdf.filter(F.col(col) < 0).count() / total_rows) * 100
            zero_pct = (sdf.filter(F.col(col) == 0).count() / total_rows) * 100
        else:
            neg_pct = 0.0
            zero_pct = 0.0

        n_unique = sdf.select(col).distinct().count()
        unique_sample = sdf.select(col).distinct().limit(10).rdd.flatMap(lambda x: x).collect()

        summary_data.append(Row(
            feature=col,
            data_type=str(data_type),
            null_value_pct=float(round(null_pct, 3)),
            neg_value_pct=float(round(neg_pct, 3)),
            zero_value_pct=float(round(zero_pct, 3)),
            duplicate_rows=float(duplicate_count),
            n_unique=float(n_unique),
            sample_unique=unique_sample
        ))

    # Explicit schema definition
    schema = T.StructType([
        T.StructField("feature", T.StringType(), True),
        T.StructField("data_type", T.StringType(), True),
        T.StructField("null_value_pct", T.DoubleType(), True),
        T.StructField("neg_value_pct", T.DoubleType(), True),
        T.StructField("zero_value_pct", T.DoubleType(), True),
        T.StructField("duplicate_rows", T.DoubleType(), True),
        T.StructField("n_unique", T.DoubleType(), True),
        T.StructField("sample_unique", T.ArrayType(T.StringType(), True), True)
    ])

    # Convert all sample_unique to strings to avoid mixed types
    clean_data = []
    for row in summary_data:
        clean_row = row.asDict()
        clean_row["sample_unique"] = [str(x) for x in clean_row["sample_unique"]]
        clean_data.append(clean_row)

    result_df = spark.createDataFrame(clean_data, schema=schema)

    return result_df

In [0]:
skimming_data(customers_df).display()

feature,data_type,null_value_pct,neg_value_pct,zero_value_pct,duplicate_rows,n_unique,sample_unique
customer_id,StringType(),0.0,0.0,0.0,0.0,99441.0,"List(e3c7e245a96d7fa339fe6c16f8da4e90, a56b03f5e6015f1a502b9810309b98b7, d0615859a639a94c1fe472eba57d4a7c, c0fe0fbc24994167dce810f83cb96890, 5b5f4957a69d537a2aeadfa7dd2d09d9, 41b200d1ce8675f154c91c2da887bcee, 456c1e01c8ed3b83aa8fc564119bc81a, 8baeca32aac79a831b81f1f8af9fd6d8, a8004a3d658be3bb26c0ad71671ef73f, 860ac166573be76ffb00c3e483892094)"
customer_unique_id,StringType(),0.0,0.0,0.0,0.0,96096.0,"List(861eff4711a542e4b93843c6dd7febb0, e607ede0e63436308660236f5a52da5e, 28da048f094c0c9cbbc5412bcf41b6b0, 212c759d8c4f2d4d9d6fd4c7de0afbb3, f96176e892232662d1c1c5896a94e035, 000c8bdb58a29e7115cfc257230fb21b, 9ccbb5f759db041b2db8359d71c0547f, 4b384b778ebc0449d0244902bfce7beb, 4df43d4c7d3a093a519dbfe0b9dcc0d6, 4be1583defacacfea129170626a62569)"
customer_zip_code_prefix,IntegerType(),0.0,0.0,0.0,0.0,14994.0,"List(20735, 18051, 13840, 12940, 26087, 29285, 70355, 74820, 31035, 7240)"
customer_city,StringType(),0.0,0.0,0.0,0.0,4119.0,"List(camacari, arapiraca, itaberaba, igrejinha, astolfo dutra, itanhaem, brusque, boa vista, cachoeira paulista, guaranta)"
customer_state,StringType(),0.0,0.0,0.0,0.0,27.0,"List(SC, RO, PI, AM, RR, GO, TO, MT, SP, ES)"


In [0]:
skimming_data(geolocation_df).display()

feature,data_type,null_value_pct,neg_value_pct,zero_value_pct,duplicate_rows,n_unique,sample_unique
geolocation_zip_code_prefix,IntegerType(),0.0,0.0,0.0,261831.0,19015.0,"List(1238, 1025, 1127, 1139, 1223, 1005, 1016, 1133, 1212, 1226)"
geolocation_lat,DoubleType(),0.0,99.866,0.0,261831.0,717372.0,"List(-23.54885719288482, -23.545657005828375, -23.548779492642403, -23.54634440022503, -23.522877997201377, -23.532368878152266, -23.521982521784317, -23.532149304749247, -23.529445092705828, -23.53574927338564)"
geolocation_lng,DoubleType(),0.0,100.0,0.0,261831.0,717615.0,"List(-46.632843914149746, -46.63486176165939, -46.64301869646116, -46.644297447833026, -46.642370526920196, -46.6404078985516, -46.63949768858565, -46.626950220930205, -46.639378335968615, -46.650516574383964)"
geolocation_city,StringType(),0.0,0.0,0.0,261831.0,8011.0,"List(sao bernardo do campo, são paulo, sãopaulo, osasco, sao jose dos campos, jundiaí, sp, sao paulo, taboão da serra, sa£o paulo)"
geolocation_state,StringType(),0.0,0.0,0.0,261831.0,27.0,"List(SP, RN, AC, ES, RJ, PI, PB, AL, MG, BA)"


First, we will remove duplicated from this data

In [0]:
geolocation_df = geolocation_df.dropDuplicates()

In [0]:
skimming_data(geolocation_df).display()

feature,data_type,null_value_pct,neg_value_pct,zero_value_pct,duplicate_rows,n_unique,sample_unique
geolocation_zip_code_prefix,IntegerType(),0.0,0.0,0.0,0.0,19015.0,"List(1238, 1025, 1127, 1139, 1223, 1005, 1016, 1133, 1212, 1226)"
geolocation_lat,DoubleType(),0.0,99.841,0.0,0.0,717372.0,"List(-23.54885719288482, -23.545657005828375, -23.548779492642403, -23.54634440022503, -23.522877997201377, -23.532368878152266, -23.521982521784317, -23.532149304749247, -23.529445092705828, -23.53574927338564)"
geolocation_lng,DoubleType(),0.0,100.0,0.0,0.0,717615.0,"List(-46.632843914149746, -46.63486176165939, -46.64301869646116, -46.644297447833026, -46.642370526920196, -46.6404078985516, -46.63949768858565, -46.626950220930205, -46.639378335968615, -46.650516574383964)"
geolocation_city,StringType(),0.0,0.0,0.0,0.0,8011.0,"List(sao bernardo do campo, são paulo, sãopaulo, osasco, sao jose dos campos, jundiaí, sp, sao paulo, taboão da serra, sa£o paulo)"
geolocation_state,StringType(),0.0,0.0,0.0,0.0,27.0,"List(SP, RN, AC, ES, RJ, PI, PB, AL, MG, BA)"


In [0]:
import unicodedata
import re

# Helper function for normalization
def normalize_string(s):
    if s is None:
        return None
    # Normalize accents
    s = unicodedata.normalize('NFKD', s)
    s = ''.join(c for c in s if not unicodedata.combining(c))
    # Lowercase and strip
    s = s.lower().strip()
    # Optionally remove any non-ASCII letters (e.g., punctuation)
    s = re.sub(r'[^a-z0-9\s]', '', s)
    return s

# Register as UDF
normalize_string_udf = F.udf(normalize_string, T.StringType())

# Function to apply cleaning to a column
def clean_city_column(geolocation_df, column_name="geolocation_city", new_column="geolocation_city"):
    """
    Adds a cleaned city column to the DataFrame:
    - Removes accents
    - Converts to lowercase
    - Removes non-ASCII punctuation
    - Strips spaces
    """
    return geolocation_df.withColumn(new_column, normalize_string_udf(F.col(column_name)))


In [0]:
geolocation_df = clean_city_column(geolocation_df)


In [0]:
skimming_data(geolocation_df).display()

feature,data_type,null_value_pct,neg_value_pct,zero_value_pct,duplicate_rows,n_unique,sample_unique
geolocation_zip_code_prefix,IntegerType(),0.0,0.0,0.0,17854.0,19015.0,"List(1238, 1025, 1127, 1139, 1223, 1005, 1016, 1133, 1212, 1226)"
geolocation_lat,DoubleType(),0.0,99.841,0.0,17854.0,717372.0,"List(-23.54885719288482, -23.545657005828375, -23.548779492642403, -23.54634440022503, -23.522877997201377, -23.532368878152266, -23.521982521784317, -23.532149304749247, -23.529445092705828, -23.53574927338564)"
geolocation_lng,DoubleType(),0.0,100.0,0.0,17854.0,717615.0,"List(-46.632843914149746, -46.63486176165939, -46.64301869646116, -46.644297447833026, -46.642370526920196, -46.6404078985516, -46.63949768858565, -46.626950220930205, -46.639378335968615, -46.650516574383964)"
geolocation_city,StringType(),0.0,0.0,0.0,17854.0,5939.0,"List(carapicuiba, sao bernardo do campo, barueri, jundiai, osasco, sao jose dos campos, taboao da serra, sp, saopaulo, sao paulo)"
geolocation_state,StringType(),0.0,0.0,0.0,17854.0,27.0,"List(SP, RN, AC, ES, RJ, PI, PB, AL, MG, BA)"


In [0]:
skimming_data(product_category_sdf).display()

feature,data_type,null_value_pct,neg_value_pct,zero_value_pct,duplicate_rows,n_unique,sample_unique
_id,StringType(),0.0,0.0,0.0,0.0,71.0,"List(685c5b5926b3fa1474b258d7, 685c5b5926b3fa1474b258dd, 685c5b5926b3fa1474b258d5, 685c5b5926b3fa1474b258d8, 685c5b5926b3fa1474b258d4, 685c5b5926b3fa1474b258da, 685c5b5926b3fa1474b258d6, 685c5b5926b3fa1474b258db, 685c5b5926b3fa1474b258d9, 685c5b5926b3fa1474b258dc)"
product_category_name,StringType(),0.0,0.0,0.0,0.0,71.0,"List(moveis_decoracao, beleza_saude, informatica_acessorios, automotivo, relogios_presentes, esporte_lazer, perfumaria, utilidades_domesticas, cama_mesa_banho, telefonia)"
product_category_name_english,StringType(),0.0,0.0,0.0,0.0,71.0,"List(bed_bath_table, watches_gifts, auto, computers_accessories, perfumery, furniture_decor, telephony, health_beauty, housewares, sports_leisure)"


Just replacing _ with ' ' for product_category_name_english

In [0]:
product_category_sdf = product_category_sdf.withColumn(
    "product_category_name_english",
    F.initcap(
        F.regexp_replace("product_category_name_english", "_", " ")
    )
)

Finding new insights from orders_df

In [0]:
orders_df = orders_df.withColumn("actual_delivery_time", datediff("order_delivered_customer_date", "order_purchase_timestamp"))
orders_df = orders_df.withColumn("estimated_delivery_time", datediff("order_estimated_delivery_date", "order_purchase_timestamp"))

Calculating delivery time delays

In [0]:
from pyspark.sql.functions import when 
orders_df = orders_df.withColumn("delay", when(col("actual_delivery_time") > col("estimated_delivery_time"),True).otherwise(False))\
    .withColumn("delay time", col("actual_delivery_time")-col("estimated_delivery_time"))

Writing all the tables into Data lake

In [0]:
order_payments_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/order_payments/")

customers_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/customers/")

geolocation_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/geolocation/")

order_items_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/order_items/")

order_reviews_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/order_reviews/")

orders_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/orders/")

products_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/products/")

sellers_df.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/sellers/")

product_category_sdf.write \
    .mode("append") \
    .option("header", "true") \
    .csv("abfss://transformeddata@oliststgacc.dfs.core.windows.net/Olist/product_category/")