In [0]:
import os

client_id = os.environ.get('client_id')
tenant_id = os.environ.get('tenant_id')
client_secret = os.environ.get('secret_value')
storage_account = "project1azure1"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

Reading Data from adls

In [0]:
df = spark.read.format("delta").load("abfss://silver@project1azure1.dfs.core.windows.net/inventory/category_tree")
df.display()


categoryid,parentid
1016,213.0
809,169.0
570,9.0
1691,885.0
536,1691.0
231,
542,378.0
1146,542.0
1140,542.0
1479,1537.0


Identifying LeafCategories

In [0]:
from pyspark.sql.functions import col, lit, when
from pyspark.sql.types import IntegerType




# IsLeafCategory 
# Get all Parent IDs
parents_df = df.select("parentid").dropna().distinct().withColumnRenamed("parentid", "distinctparentid")


# Join: mark leaf nodes (categories that never appear as a parent)
df = df.join(parents_df, df["categoryid"] == parents_df["distinctparentid"], how="left") \
       .withColumn("IsLeafCategory", when(col("distinctparentid").isNull(), lit(1)).otherwise(lit(0))) \
       .drop("distinctparentid")



Identifying Category level

In [0]:
# Build CategoryID → Parent map
cat_map = df.select("categoryid", "parentid").rdd.map(lambda row: (row[0], row[1])).collectAsMap()

# Function to calculate level
def get_level(cat_id):
    level = 1
    while cat_id in cat_map and cat_map[cat_id] is not None:
        cat_id = cat_map[cat_id]
        level += 1
    return level

# Register UDF
from pyspark.sql.functions import udf
level_udf = udf(get_level, IntegerType())

# Add CategoryLevel column
df = df.withColumn("CategoryLevel", level_udf(col("categoryid")))




Savind data to adls

In [0]:
dim_category = df.select("categoryid", "parentid", "CategoryLevel", "IsLeafCategory")



Finding Topmost category for each categoryid

In [0]:
category_hierarchy = dim_category.select(
    col("categoryid").alias("leaf_categoryid"),
    col("parentid"),
    col("CategoryLevel")
).withColumn("top_categoryid", col("leaf_categoryid"))

previous_top_category = None
iteration = 0

while True:
    print(f"Iteration {iteration + 1}")
    iteration += 1

    parent_df = dim_category.select(
        col("categoryid").alias("parent_categoryid"),
        col("parentid").alias("next_parentid"),
        col("CategoryLevel").alias("parent_level")
    )

    joined = category_hierarchy.join(
        parent_df,
        category_hierarchy["top_categoryid"] == parent_df["parent_categoryid"],
        how="left"
    )

    updated = joined.withColumn(
        "new_top_categoryid",
        when(col("parent_level") == 1, col("parent_categoryid"))
        .when(col("next_parentid").isNotNull(), col("next_parentid"))
        .otherwise(col("top_categoryid"))
    ).drop("parent_categoryid", "next_parentid", "parent_level")

    if previous_top_category is not None:
        changed = updated.join(previous_top_category, ["leaf_categoryid"], "inner") \
                         .filter(updated["new_top_categoryid"] != previous_top_category["top_categoryid"])
        
        if changed.count() == 0:
            print("No changes detected. Hierarchy resolved.")
            break

    previous_top_category = updated.select("leaf_categoryid", col("new_top_categoryid").alias("top_categoryid"))
    category_hierarchy = updated.drop("top_categoryid").withColumnRenamed("new_top_categoryid", "top_categoryid")
    
category_top_map = category_hierarchy.select("leaf_categoryid", "top_categoryid").dropDuplicates()


dim_category = dim_category.join(category_top_map, dim_category["categoryid"] == category_top_map["leaf_categoryid"], how="left")
dim_category=dim_category.drop("leaf_categoryid")

dim_category.display()


Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Iteration 6
No changes detected. Hierarchy resolved.


categoryid,parentid,CategoryLevel,IsLeafCategory,top_categoryid
1591,1579.0,2,0,1579
148,1110.0,3,1,653
463,250.0,2,1,250
1645,1534.0,4,1,140
1088,287.0,3,1,1600
1580,441.0,4,1,378
471,1368.0,4,1,140
833,901.0,3,1,679
1342,362.0,3,1,431
1238,265.0,4,1,1600


In [0]:
dim_category.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("abfss://gold@project1azure1.dfs.core.windows.net/dim_category")


