In [0]:
product_df = spark.read.table("supplychain_catalog.silver.product")
supplier_df = spark.read.table("supplychain_catalog.silver.supplier")
transportation_df = spark.read.table("supplychain_catalog.silver.transportation")
purchase_orders_df = spark.read.table("supplychain_catalog.silver.purchase_orders")

#### Advanced analytics:
Example 1: Calculate the average number of services provided by suppliers

In [0]:
from pyspark.sql.functions import  countDistinct, avg

In [0]:
supplier_service_count = supplier_df.groupBy('company_name').agg(
countDistinct('business_type').alias('service_count')
)
#.agg(avg('service_count').alias('avg_service_count'))

display(supplier_service_count)

Example 2: Identify top suppliers by the number of materials provided

In [0]:
%sql
select 
p.Product_type,s.Business_Type,s.Company_Name,
sum(cast(Number_of_products_sold as int)) as num_of_prod
from supplychains.purchaseorders po 
inner join supplychains.product  p on p.p_id =po.P_ID
inner join supplychains.supplier s on s.S_ID = po.Supplier_Id
--where business_type = 'Constrution materialsConstruction'
group by p.Product_type,s.Company_Name,s.Business_Type
order by p.Product_type,s.Business_Type,s.Company_Name

Example 3: Rank suppliers based on the total number of services provided

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc,countDistinct,cast,col,sum,avg,coalesce,lit


In [0]:

windowSpec = Window.orderBy(desc("Service_Count"))
ranked_suppliers = supplier_df.groupBy("Company_Name").agg(
    countDistinct("Business_Type").alias("Service_Count")
).withColumn("Rank", rank().over(windowSpec))

ranked_suppliers.show()

In [0]:
supplier_df.printSchema()

In [0]:
# Define a list of columns to handle non-numeric values and cast to double
numeric_columns = ["number_of_products_sold", "revenue_generated", "stock_levels", "lead_times",
                   "order_quantities", "shipping_times", "shipping_costs", "Total_Cost"]

# Ensure all specified columns are cast to double and handle non-numeric values
for column in numeric_columns:
    purchase_orders_df = purchase_orders_df.withColumn(
        column,
        col(column).cast("double")
    )

    # Filter out rows where the column is not numeric
    purchase_orders_df = purchase_orders_df.filter(
        col(column).isNotNull()
    )

In [0]:
purchase_orders_df.printSchema()

In [0]:
supplier_stats_df = purchase_orders_df.groupBy("supplier_id").agg(
    sum('number_of_products_sold').alias('Total_Sold'),
    sum('revenue_generated').alias('Total_Revenue'),
    avg('stock_levels').alias('Average_Stock'),
    avg("lead_times").alias("Avg_Lead_Times"),
    sum("order_quantities").alias("Total_Order_Quantities"),
    avg("shipping_times").alias("Avg_Shipping_Times"),
    countDistinct("shipping_carriers").alias("Distinct_Shipping_Carriers"),
    avg("shipping_costs").alias("Avg_Shipping_Costs"),
    sum("Total_Cost").alias("Total_Cost")
)

In [0]:
comprehensive_suppliers = supplier_df.join(
    supplier_stats_df, 
    supplier_df['supplier_Id'] == supplier_stats_df["supplier_Id"], 
    how="inner"
).drop(supplier_df['supplier_Id'])

In [0]:
comprehensive_suppliers.printSchema()

In [0]:
# Calculate the Recommendation Score based on specific columns
comprehensive_suppliers = comprehensive_suppliers.withColumn(
    "Recommendation_Score",
    coalesce(col("Total_Sold").cast("double"), lit(0)) +
    coalesce(col("Total_Revenue").cast("double"), lit(0)) +
    coalesce(col("Average_Stock").cast("double"), lit(0)) + 
    coalesce(col("Avg_Lead_Times").cast("double"), lit(0)) +
    coalesce(col("Total_Order_Quantities").cast("double"), lit(0)) + 
    coalesce(col("Avg_Shipping_Times").cast("double"), lit(0)) +    
    coalesce(col("Avg_Shipping_Costs").cast("double"), lit(0)) + 
    coalesce(col("Total_Cost").cast("double"), lit(0)) +
    coalesce(col("Negotiation_Score").cast("double"), lit(0)) +
    coalesce(col("Defect_Quality").cast("double"), lit(0))
)

In [0]:
# Create a ranking based on the Recommendation Score
windowSpec = Window.orderBy(desc("Recommendation_Score"))
comprehensive_suppliers = comprehensive_suppliers.withColumn(
    "Rank", 
    rank().over(windowSpec)
)
display(comprehensive_suppliers)

In [0]:
%python
# Drop the existing table if it exists
spark.sql("DROP TABLE IF EXISTS supplychain_catalog.gold.suppliers_recommendation")

# Write the DataFrame to the table
comprehensive_suppliers.write.mode("overwrite").saveAsTable("supplychain_catalog.gold.suppliers_recommendation")

In [0]:
# Register the DataFrame as a temporary SQL table
comprehensive_suppliers.createOrReplaceTempView("comprehensive_suppliers_view")

#### Total Revenue by Supplier

In [0]:
%sql
select * from comprehensive_suppliers_view

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

#### Suppliers Based on Recommendation Score

In [0]:
%sql
select * from comprehensive_suppliers_view