In [1]:
import os
os.path.exists("/content/Cloud Warehouse Compersion Chart.csv")


True

In [10]:
from pyspark.sql.functions import col, regexp_extract, when


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Cloud_Warehouse_ETL") \
    .getOrCreate()

df = spark.read.csv(
    "/content/Cloud Warehouse Compersion Chart.csv",
    header=True,
    inferSchema=True
)


In [3]:
df.printSchema()
df.show(10, truncate=False)


root
 |-- index: integer (nullable = true)
 |-- Shiprocket: string (nullable = true)
 |-- Unnamed: 1: string (nullable = true)
 |-- INCREFF: string (nullable = true)

+-----+--------------------------------------------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|index|Shiprocket                                        |Unnamed: 1      |INCREFF                                                                                                                                                                                           |
+-----+--------------------------------------------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |Heads        

In [4]:
from pyspark.sql.functions import col

df_clean = df.select(
    col("Shiprocket").alias("service"),
    col("Unnamed: 1").alias("shiprocket_price_raw"),
    col("INCREFF").alias("increff_price_raw")
)


In [5]:
df_clean = df_clean.filter(
    col("service").isNotNull()
)


In [24]:


df_prices = df_clean \
    .withColumn(
        "shiprocket_price",
        when(
            regexp_extract(col("shiprocket_price_raw"), r"(\d+\.?\d*)", 1) != "",
            regexp_extract(col("shiprocket_price_raw"), r"(\d+\.?\d*)", 1).cast("double")
        ).otherwise(None)
    ) \
    .withColumn(
        "increff_price",
        when(
            regexp_extract(col("increff_price_raw"), r"(\d+\.?\d*)", 1) != "",
            regexp_extract(col("increff_price_raw"), r"(\d+\.?\d*)", 1).cast("double")
        ).otherwise(None)
    )


In [25]:


df_prices = df_clean \
    .withColumn(
        "shiprocket_price",
        regexp_extract(col("shiprocket_price_raw"), r"(\d+\.?\d*)", 1).cast("double")
    ) \
    .withColumn(
        "increff_price",
        regexp_extract(col("increff_price_raw"), r"(\d+\.?\d*)", 1).cast("double")
    )


In [13]:
df_prices.show(truncate=False)


+--------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+----------------+-------------+
|service                                           |shiprocket_price_raw                                                                                                          |increff_price_raw                                                                                                                              |shiprocket_price|increff_price|
+--------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------

In [8]:
df_prices.printSchema()



root
 |-- service: string (nullable = true)
 |-- shiprocket_price_raw: string (nullable = true)
 |-- increff_price_raw: string (nullable = true)
 |-- shiprocket_price: double (nullable = true)
 |-- increff_price: double (nullable = true)



In [14]:
df_prices.filter(
    col("shiprocket_price").isNull() |
    col("increff_price").isNull()
).show()


+--------------------+--------------------+--------------------+----------------+-------------+
|             service|shiprocket_price_raw|   increff_price_raw|shiprocket_price|increff_price|
+--------------------+--------------------+--------------------+----------------+-------------+
|               Heads|    Price (Per Unit)|    Price (Per Unit)|            NULL|         NULL|
|(A) SCOPE OF WORK...|                NULL|                NULL|            NULL|         NULL|
|              Inward|           Unloading|• Boxes counted a...|            NULL|         NULL|
|             Outward|       Order receipt|•  Orders will be...|            NULL|         NULL|
|             Returns|              Inward|•  RTO and Custom...|            NULL|         NULL|
|Inventory management|         Cycle count|•  Daily cycle co...|            NULL|          3.0|
|   (B) MISCELLANEOUS|                NULL|                NULL|            NULL|         NULL|
|        © EXCLUSIONS|                NU

In [26]:


df_kpi_base = df_prices.filter(
    col("shiprocket_price").isNotNull() |
    col("increff_price").isNotNull()
)


In [16]:
df_kpi = df_kpi_base.withColumn(
    "cost_difference",
    col("shiprocket_price") - col("increff_price")
)


In [27]:


df_kpi = df_kpi.withColumn(
    "cheaper_provider",
    when(col("shiprocket_price") < col("increff_price"), "Shiprocket")
    .when(col("shiprocket_price") > col("increff_price"), "Increff")
    .otherwise("Same / NA")
)


In [18]:
avg_cost_kpi = df_kpi.selectExpr(
    "avg(shiprocket_price) as avg_shiprocket_cost",
    "avg(increff_price) as avg_increff_cost"
)


In [19]:
avg_cost_kpi.show()


+-------------------+------------------+
|avg_shiprocket_cost|  avg_increff_cost|
+-------------------+------------------+
|               10.6|14.284999999999997|
+-------------------+------------------+



In [20]:
overall_kpi = df_kpi.selectExpr(
    "sum(shiprocket_price) as total_shiprocket_cost",
    "sum(increff_price) as total_increff_cost",
    """
    (sum(shiprocket_price) - sum(increff_price))
    / sum(shiprocket_price) * 100
    as cost_saving_percentage
    """
)
overall_kpi.show()


+---------------------+------------------+----------------------+
|total_shiprocket_cost|total_increff_cost|cost_saving_percentage|
+---------------------+------------------+----------------------+
|                 53.0|142.84999999999997|    -169.5283018867924|
+---------------------+------------------+----------------------+



In [28]:
df_kpi_final = df_kpi.select(
    "service",
    "shiprocket_price",
    "increff_price",
    "cost_difference",
    "cheaper_provider"
)
df_kpi_final.show(truncate=False)


+--------------------------------+----------------+-------------+---------------+----------------+
|service                         |shiprocket_price|increff_price|cost_difference|cheaper_provider|
+--------------------------------+----------------+-------------+---------------+----------------+
|Inbound (Fresh Stock and RTO)   |4.0             |4.0          |0.0            |Same / NA       |
|Outbound                        |7.0             |11.0         |-4.0           |Shiprocket      |
|Storage Fee/Cft                 |25.0            |0.15         |24.85          |Increff         |
|Customer Return with Detailed QC|6.0             |15.5         |-9.5           |Shiprocket      |
|Inventory management            |NULL            |3.0          |NULL           |Same / NA       |
|Monthly Fill Rate               |NULL            |99.8         |NULL           |Same / NA       |
|Daily Processing Capacity       |11.0            |NULL         |NULL           |Same / NA       |
|Customer 

In [22]:
df_kpi_final.write \
    .mode("overwrite") \
    .parquet("/content/output/warehouse_cost_kpis")


In [23]:
spark.read.parquet("/content/output/warehouse_cost_kpis").show(truncate=False)


+--------------------------------+----------------+-------------+---------------+----------------+
|service                         |shiprocket_price|increff_price|cost_difference|cheaper_provider|
+--------------------------------+----------------+-------------+---------------+----------------+
|Inbound (Fresh Stock and RTO)   |4.0             |4.0          |0.0            |Same / NA       |
|Outbound                        |7.0             |11.0         |-4.0           |Shiprocket      |
|Storage Fee/Cft                 |25.0            |0.15         |24.85          |Increff         |
|Customer Return with Detailed QC|6.0             |15.5         |-9.5           |Shiprocket      |
|Inventory management            |NULL            |3.0          |NULL           |Same / NA       |
|Monthly Fill Rate               |NULL            |99.8         |NULL           |Same / NA       |
|Daily Processing Capacity       |11.0            |NULL         |NULL           |Same / NA       |
|Customer 

In [29]:
df_kpi_final.explain(True)


== Parsed Logical Plan ==
'Project ['service, 'shiprocket_price, 'increff_price, 'cost_difference, 'cheaper_provider]
+- Project [service#39, shiprocket_price_raw#40, increff_price_raw#41, shiprocket_price#101, increff_price#102, cost_difference#141, CASE WHEN (shiprocket_price#101 < increff_price#102) THEN Shiprocket WHEN (shiprocket_price#101 > increff_price#102) THEN Increff ELSE Same / NA END AS cheaper_provider#250]
   +- Project [service#39, shiprocket_price_raw#40, increff_price_raw#41, shiprocket_price#101, increff_price#102, cost_difference#141, CASE WHEN (shiprocket_price#101 < increff_price#102) THEN Shiprocket WHEN (shiprocket_price#101 > increff_price#102) THEN Increff ELSE Same / NA END AS cheaper_provider#142]
      +- Project [service#39, shiprocket_price_raw#40, increff_price_raw#41, shiprocket_price#101, increff_price#102, (shiprocket_price#101 - increff_price#102) AS cost_difference#141]
         +- Filter (isnotnull(shiprocket_price#101) OR isnotnull(increff_price#1