**Project Name** : Optimization of Supply Moves (OSM) <br/>
**Nestle SPOCs** :Aris Anadilla , Emily Knaus <br/>
**Created By** : Akansha Rana <br/>
**Purpose** : To Identify the Inventory that is on Risk at a DC and Recommend STOs to other DCs having demand to consume this AtRisk Inventory, to reduce the wastage <br/>
**Date** : <br/>
**Functionality** : This Notebook uses the Excess Inv, Excess Demand Data, and recommend inv movement between locations<br/>
1. First step is to create all possible combination of Excess Inv Excess Demand
2. Then It runs optimizer, that will recommend the best suited combination for Inv movement based on value of qty that can be moved and shipping cost for that movement

**Output Folder** : 
1. solutions/ift/ift/outbount/Test
2. solutions/ift/ift/outbound/Test/Test/


**Dependent Notebooks** : 1. Azure_Connection, 2. InputCreation_PowerApps </br>

**Widgets Definition**: </br>
<small>
1.Network:Network for which model should be run(DataType = String, Range = ("Ambient", "Frozen", "Chilled", "NPW")</br>
2.Scenerio:Scenario based on the demand componesnts considered(DataType = String, Range = ("Scenerio1", "Scenerio2"))</br>
3.RunType:Type of the model Run - Regular means it is a standard run usable for every planner, Adhoc means it is run by a specific planner for specific purpose (DataType = String, Range = ('Regular','Adhoc'))</br>
4.DCExcFromSource: List of Source DCs, that should be excluded from the Run(No inventory will be recommended out of this DC) (DataType = string)</br>
5.DCExcFromDestination: List of Destination DCs, that should be excluded from the Run(No inventory will be recommended into this DC) (DataType = string)</br>
6.DispatchDateHorizon: period where we can dispatch an STO. Example - if the given value is 120 the dispatch date for any recommendation can be in next 120 days (DataType = integer)[1,356]</br>
7.BufferDaysForDispatch: Buffer Period between Model Run and 1st dispatch date. Example - if the given value is 2 the dispatch date for any recommendation can not be in next 2 days (DataType = integer)[0,356]</br>
8.SalvageDateHorizon: Horizon For Salvage Date. Example - if the given value is 120 all the batch whose salvage date is in next 120 days, will be considered for movement (DataType = integer)[0,356]</br>
9.MinRecPalletQty: Minimum pallet Qty for a move. Example - if the given value is 1, all the recommendation aggregated at source-destination-Material Level will have Recommended Qty greater than 1 pallet(DataType = float)[>=0]</br>
10.SKUExclusionList: List Of SKUs which should not be moved. Example - these material will not be recommended(DataType = comma seperated list of material number)</br>
11.FreshnessThreshold: Freshness range to be considered for movement. Example - if the given value is 10-100, batches having freshness % value in this range will be considered for the run). Datatype (min and max threshold seperated with '-'</br>
12.CPMThreshold: threshold for shipping cost to recommended value ratio. Example - if the given value is 50, Every recommendation will have the CPM Ratio (Shipping cost / Recommended $ Value) less than 50%. Datatype (float) Range[0,100]</br>

*limitations of widgets value - </br>
1.BufferDaysForDispatch <= DispatchDateHorizon</br>
2.DispatchDateHorizon <= SalvageDateHorizon</br>


#Azure Connection

In [0]:
%run ./Azure_Connection

In [0]:
%run ./control_file

In [0]:
#  importing required libraries
import numpy as np
import pandas as pd
from pyspark.sql.types import DateType
import pyspark.sql.functions as sf
from pyspark.sql.functions import current_date, col
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql import Window
from datetime import date, datetime, timedelta
from delta import DeltaTable
from zoneinfo import ZoneInfo



# Defining parameters

In [0]:
# setting the timezone and the time value
# target_timezone = "America/New_York"
us_eastern_dt = datetime.now(tz=ZoneInfo(target_timezone))
print(us_eastern_dt)

2024-03-08 11:54:31.218747+00:00


In [0]:
# widget Creation For multiple parameters
# 1. List of Source DCs, that should be excluded from the Run
dbutils.widgets.text("DCExcFromSource", "", "Exclude DCs from Source")
# 2. List of Destination DCs, that should be excluded from the Run
dbutils.widgets.text("DCExcFromDestination", "", "Exclude DCs from Destination")
# 3. period where we can dispatch an STO
dbutils.widgets.text("DispatchDateHorizon", "120", "Horizon For Dispatch Date")
# 4. Buffer Period between Model Run and 1st dispatch date
dbutils.widgets.text("BufferDaysForDispatch", "2", "Buffer Days for Dispatch")
# 5. Horizon For Salvage Date
dbutils.widgets.text("SalvageDateHorizon", "120", "Horizon For Salvage Date")
# 6. Minimum pallet Qty for a move
dbutils.widgets.text("MinRecPalletQty", "1", "Minimum Rec Pallet Qty")
# 7. List Of SKUs which should not be moved
dbutils.widgets.text("SKUExclusionList", "", "SKUs which should not be moved")
# 8. Freshness range to be considered for movement
dbutils.widgets.text(
    "FreshnessThreshold", "0-100", "Freshness Range for Inventory to be moved"
)
# 9. threshold for shipping cost to recommended value ratio
dbutils.widgets.text("CPMThreshold", "100", "Cost Per Move Threshold for Moves")
# 10. Type of Run, Regular or Adhoc
dbutils.widgets.dropdown("RunType", "Regular", ["Regular", "Adhoc"], "Type of Run")

In [0]:
# Getting values from Widgets
DCExcFromSource = dbutils.widgets.get("DCExcFromSource")
DCExcFromSource = [i for i in DCExcFromSource.split(",")]

DCExcFromDestination = dbutils.widgets.get("DCExcFromDestination")
DCExcFromDestination = [i for i in DCExcFromDestination.split(",")]

dispatch_date_horizon = dbutils.widgets.get("DispatchDateHorizon")
dispatch_date_horizon = int(dispatch_date_horizon)

BufferDaysForDispatch = dbutils.widgets.get("BufferDaysForDispatch")
BufferDaysForDispatch = int(BufferDaysForDispatch)

salvage_date_horizon = dbutils.widgets.get("SalvageDateHorizon")
salvage_date_horizon = int(salvage_date_horizon)

MinRecPalletQty = dbutils.widgets.get("MinRecPalletQty")
MinRecPalletQty = float(MinRecPalletQty)

SKUExclusionList = dbutils.widgets.get("SKUExclusionList")
try:
    SKUExclusionList = [i for i in SKUExclusionList.split(",")]
except:
    SKUExclusionList = []

FreshnessThreshold = dbutils.widgets.get("FreshnessThreshold")
try:
    FreshnessThreshold_min = float(FreshnessThreshold.aplit("-")[0])
    FreshnessThreshold_max = float(FreshnessThreshold.aplit("-")[1])
except:
    FreshnessThreshold_min = 0
    FreshnessThreshold_max = 100
CPMThreshold = dbutils.widgets.get("CPMThreshold")
try:
    CPMThreshold = float(CPMThreshold) / 100
except:
    CPMThreshold = 1

RunType = dbutils.widgets.get("RunType")
if RunType == "Adhoc":
    RunType_path = "Adhoc/"
else:
    RunType_path = "Regular/"
print(
    "DCExcFromSource =",
    DCExcFromSource,
    "\nDCExcFromDestination =",
    DCExcFromDestination,
    "\ndispatch_date_horizon =",
    dispatch_date_horizon,
    "\nBufferDaysForDispatch =",
    BufferDaysForDispatch,
    "\nsalvage_date_horizon =",
    salvage_date_horizon,
    "\nMinRecPalletQty =",
    MinRecPalletQty,
    "\nSKUExclusionList =",
    SKUExclusionList,
    "\nFreshnessThreshold =",
    FreshnessThreshold,
    "\nFreshnessThreshold_MIN =",
    FreshnessThreshold_min,
    "\nFreshnessThreshold_MAX =",
    FreshnessThreshold_max,
    "\nCPMThreshold =",
    CPMThreshold,
    "\nRunType =",
    RunType,
    "\nRunType_path =",
    RunType_path,
)

DCExcFromSource = [''] 
DCExcFromDestination = [''] 
dispatch_date_horizon = 120 
BufferDaysForDispatch = 2 
salvage_date_horizon = 120 
MinRecPalletQty = 1.0 
SKUExclusionList = [''] 
FreshnessThreshold = 0-100 
FreshnessThreshold_MIN = 0 
FreshnessThreshold_MAX = 100 
CPMThreshold = 1.0 
RunType = Regular 
RunType_path = Regular/


In [0]:
# parameters for Network and Demand Scenario
# the destination path and input path for the data will be defined based on the Network, Scenario and RunTYpe
dbutils.widgets.dropdown('Network','Ambient',['Ambient','Frozen','Chilled','NPW'],'Select Network')
Network = dbutils.widgets.get('Network')
dbutils.widgets.dropdown('Scenerio','Scenerio1',['Scenerio1','Scenerio2'],'Select Scenerio')
Scenerio = dbutils.widgets.get('Scenerio')
print(Network,Scenerio)
if Scenerio == 'Scenerio1':
    Scenerio_path = ''
elif Scenerio == 'Scenerio2':
    Scenerio_path = '_'+Scenerio
else:
    raise Exception('Wrong Scenerio Entered')

if ((Network == 'Ambient')|(Network == 'Frozen')|(Network == 'Chilled')|(Network == 'NPW')):
  pass
else:
    raise Exception('Wrong Network Entered')

print(Network,Scenerio_path)
if RunType == 'Regular':
    destination_path = 'abfss://root@'+stgAct+'.dfs.core.windows.net'+folder_path+'Outbound/ModelOutput/'+Network+Scenerio_path+'/'
    input_path = 'abfss://root@'+stgAct+'.dfs.core.windows.net'+folder_path+'Inbound/Model_Input/'+Network+Scenerio_path+'/'
else:
    destination_path = 'abfss://root@'+stgAct+'.dfs.core.windows.net'+folder_path+'Outbound/ModelOutput/'+Network+Scenerio_path+'_Adhoc/'
    input_path = 'abfss://root@'+stgAct+'.dfs.core.windows.net'+folder_path+'Inbound/Model_Input/'+Network+Scenerio_path+'_Adhoc/'
print('destination_path =',destination_path)
print('input_path = ',input_path)

Frozen Scenerio2
Frozen _Scenerio2
destination_path = abfss://root@nusadhprsadatalakedev.dfs.core.windows.net/solutions/ift/ift/outbound/Tredence_OSM/Outbound/ModelOutput/Frozen_Scenerio2/
input_path =  abfss://root@nusadhprsadatalakedev.dfs.core.windows.net/solutions/ift/ift/outbound/Tredence_OSM/Inbound/Model_Input/Frozen_Scenerio2/


In [0]:
# destination path for POwerBI files for Batch level output and AtriskandSalvageInv
BatchLevelOutput_PBIPath = (
    "abfss://root@"
    + stgAct
    + ".dfs.core.windows.net"
    + pbi_folder_path
    + "PowerBI/"
    + Scenerio
    + "/"
    + RunType_path
    + "BatchLevelOutput/"
    + Network
)
AtRiskAndSalvageInv_PBIPath = (
    "abfss://root@"
    + stgAct
    + ".dfs.core.windows.net"
    + pbi_folder_path
    + "PowerBI/"
    + Scenerio
    + "/"
    + RunType_path
    + "AtRiskandSalvageInventory/"
    + Network
)
print(BatchLevelOutput_PBIPath, "\n", AtRiskAndSalvageInv_PBIPath)

abfss://root@nusadhprsadatalakedev.dfs.core.windows.net/solutions/ift/ift/outbound/Tredence_Power_BI/Tredence_OSM/PowerBI/Scenerio2/Regular/BatchLevelOutput/Frozen 
 abfss://root@nusadhprsadatalakedev.dfs.core.windows.net/solutions/ift/ift/outbound/Tredence_Power_BI/Tredence_OSM/PowerBI/Scenerio2/Regular/AtRiskandSalvageInventory/Frozen


In [0]:
today = date.today()
today_date = str(today)
dateformat = (
    today_date.split("-")[0]
    + "/"
    + today_date.split("-")[1]
    + "/"
    + today_date.split("-")[2]
)
ModelRunDate = datetime.strptime(dateformat, "%Y/%m/%d").date()
print(dateformat, ModelRunDate)
print(Network)

2024/03/08 2024-03-08
Frozen


#Reading Data files

In [0]:
# reading the input data and applying required filters
try:
    AtRiskandSalvageInventory = spark.read.format("delta").load(
        input_path + "AtRiskandSalvageInventory"
    )
except Exception as e:
    print(f"Error in loading AtRiskandSalvageInventory data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")


AtRiskandSalvageInventory = (
    AtRiskandSalvageInventory.filter(sf.col("Report_Run_Date") == ModelRunDate)
    .filter(
        sf.col("SalvageDate") < sf.date_add(sf.lit(ModelRunDate), salvage_date_horizon)
    )
    .withColumn(
        "Freshness",
        sf.datediff(col("SalvageDate"), sf.lit(ModelRunDate))
        / sf.datediff(col("SalvageDate"), col("ProductionDate")),
    )
    .filter(
        (col("Freshness") <= FreshnessThreshold_max)
        & (col("Freshness") >= FreshnessThreshold_min)
    )
)

try:
    ExcessDemand_orig = spark.read.format("delta").load(input_path + "ExcessDemand")
except Exception as e:
    print(f"Error in loading ExcessDemand data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")

# excess demand data will be filter for excess demand check days and salvage date horizon
# excess demand will be used at parent material level as one  material's excess inv can be used for its other family materials
ExcessDemand_orig = (
    ExcessDemand_orig.filter(sf.col("Report_Run_Date") == ModelRunDate)
    .filter(
        sf.col("Date")
        < sf.date_add(
            sf.lit(ModelRunDate), salvage_date_horizon + EXCESS_DEMAND_CHECK_DAYS - 1
        )
    )
    .groupby(
        sf.col("ParentMaterialID").alias("MaterialID"),
        "Date",
        "Location",
        "WeekStart",
        "Report_Run_Date",
    )
    .agg(
        sf.sum("LocalDemand").alias("LocalDemand"),
        sf.sum("ExcessDemand").alias("ExcessDemand"),
        sf.sum("BatchInventory").alias("BatchInventory"),
    )
)


try:
    ShippingCost = spark.read.format("delta").load(input_path + "Shipping_Cost_Input")
except Exception as e:
    print(f"Error in loading Shipping_Cost_Input data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")

ShippingCost = ShippingCost.filter(sf.col("Report_Run_Date") == ModelRunDate)

##Recommended Order Treatment

In [0]:
# reading the already accepted orders by the Business Users. This data will be used to manipulate excess demand and excess numbers, so that these recommendations can not be recommended again. These recommendations are accepted but these are not reflecting in the data
try:
    AcceptedRecommendation = spark.read.parquet(
        "abfss://root@"
        + stgAct
        + ".dfs.core.windows.net"
        + pbi_folder_path
        + "PowerBI/AcceptedRecommendation/*"
    )
except Exception as e:
    print(f"Error in loading AcceptedRecommendation data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")

# taking last 7 day's accepted recommendations
AcceptedRecommendation = AcceptedRecommendation.filter(
    col("Accepted_order_date") > sf.lit(ModelRunDate - timedelta(7))
)


# getting total accepted recomm qty from source and to Dest
recm_qty_from_source = AcceptedRecommendation.groupby(
    col("SourceDC").alias("Location"), "MaterialID", "Batch"
).agg(sf.sum("FinalAcceptedQty").alias("QtyMovedOut"))
recm_qty_to_dest = (
    AcceptedRecommendation.groupby(
        col("DestinationDC").alias("Location"),
        col("ParentMaterialID").alias("MaterialID"),
        col("SalvageDate").alias("Date"),
    )
    .agg(sf.sum("FinalAcceptedQty").alias("QtyMovedIn"))
    .withColumn("Date", col("Date") - 1)
)

In [0]:
# From the total excess inv we will remove the Already recommended and accepted qty, so we do not make repeated recom.
AtRiskandSalvageInventory1 = (
    AtRiskandSalvageInventory.join(
        recm_qty_from_source, ["Location", "MaterialID", "Batch"], "left"
    )
    .fillna(0, ["QtyMovedOut"])
    .withColumn(
        "QuantityinUoM",
        sf.greatest(col("QuantityinUoM") - col("QtyMovedOut"), sf.lit(0)),
    )
)

# for the destination, accepted qty will be considered as the batch inv onone day before the salvage date and running demand and excess demand will be recalculated
ExcessDemand_orig1 = (
    ExcessDemand_orig.join(recm_qty_to_dest, ["Location", "MaterialID", "Date"], "left")
    .fillna(0, ["QtyMovedIn"])
    .withColumn("BatchInventory", sf.col("BatchInventory") + sf.col("QtyMovedIn"))
)

In [0]:
# doing the running forecast calculations
data = (
    ExcessDemand_orig1.withColumn(
        "BatchInventory", sf.col("BatchInventory").cast(IntegerType())
    )
    .withColumn("Date", sf.col("Date").cast(DateType()))
    .withColumn("MaterialID", sf.col("MaterialID").cast(IntegerType()))
    .withColumn("LocalDemand", sf.col("LocalDemand").cast(IntegerType()))
    .withColumn(
        "record_index", sf.concat_ws("_", sf.col("MaterialID"), sf.col("Location"))
    )
    .sort("record_index", "Date")
)

# this function calculate the running demand on every day(cumalative demand on that day - demand that can be fulfilled by salvaging inventory before that day). if the value is -ve then it should be capped at 0
def RunningDemand(inv_demand_data):
    """
    Calculate running demand values for each row in the input DataFrame.

    Args:
        inv_demand_data (pd.DataFrame): Input DataFrame containing data for Running Demand calculation.

    Returns:
        pd.DataFrame: DataFrame with additional columns for Running demand, and index number.

    Note:
        This function assumes that the input DataFrame has columns such as BatchInventory and localDemand

        The function calculates RunningDemand, indexnumber based on the provided logic.
    """
    inv_demand_data["RunningDemand"] = np.nan
    inv_demand_data["x"] = np.nan  # index

    inv_demand_data["BatchInventory"] = inv_demand_data["BatchInventory"].fillna(0)
    for index, row in inv_demand_data.iterrows():
        inv_demand_data["x"].iloc[index] = index
        if index == 0:
            inv_demand_data["RunningDemand"].iloc[index] = inv_demand_data[
                "LocalDemand"
            ].iloc[index]
        else:
            if (
                inv_demand_data["BatchInventory"].iloc[index - 1]
                <= inv_demand_data["RunningDemand"].iloc[index - 1]
            ):
                val1 = (
                    inv_demand_data["RunningDemand"].iloc[index - 1]
                    - inv_demand_data["BatchInventory"].iloc[index - 1]
                )
                inv_demand_data["RunningDemand"].iloc[index] = (
                    val1 + inv_demand_data["LocalDemand"].iloc[index]
                )
            else:
                inv_demand_data["RunningDemand"].iloc[index] = inv_demand_data[
                    "LocalDemand"
                ].iloc[index]
    return inv_demand_data

In [0]:
# applying the RunningDemand function and creating the dataframe that contains running forecast
import numpy as np

schema = "MaterialID integer, Location string, Date date, WeekStart date, BatchInventory integer,QtyMovedIn int, LocalDemand integer, record_index string, RunningDemand double,ExcessDemand int, x int,Report_Run_Date date"
result = (
    data.groupBy("record_index")
    .applyInPandas(RunningDemand, schema=schema)
    .withColumn(
        "ExcessDemand",
        sf.greatest(sf.col("RunningDemand") - sf.col("BatchInventory"), sf.lit(0)),
    )
)

In [0]:
# The data is converted on Parent
ExcessInventory = (
    AtRiskandSalvageInventory.groupby(
        sf.col("ParentMaterialID").alias("MaterialID"),
        "Location",
        "SalvageDate",
        "UoM",
        "Division",
        "ProductLevel3CategoryID",
        "ProductWeightUnitOfMeasure",
    )
    .agg(
        sf.sum("QuantityinUoM").alias("QuantityinUoM"),
        sf.mean("ProductStandardCost").cast("decimal(38,6)").alias("ProductStandardCost"),
        sf.avg("CostofGoodsSold").alias("CostofGoodsSold"),
        sf.avg("UniversalBasePrice").alias("UniversalBasePrice"),
        sf.avg("ProductPlanningUnitsPerPallet").cast("decimal(38,19)").alias("ProductPlanningUnitsPerPallet"),
        sf.avg("ProductPlanningUnitsPerCase").cast("decimal(38,19)").alias("ProductPlanningUnitsPerCase"),
        sf.min("StackabilityFactor").alias("StackabilityFactor"),
        sf.avg("ProductSalesGrossWeight").alias("ProductSalesGrossWeight"),
        sf.avg("Price").alias("Price"),
    )
    .withColumn("PUMToWeightFactor", sf.col("ProductSalesGrossWeight"))
    .filter(~(sf.col("ProductPlanningUnitsPerPallet").isNull()))
)

In [0]:
try:
    material_classification = spark.read.format("parquet").load(
        "abfss://root@"
        + stgAct
        + ".dfs.core.windows.net"
        + folder_path
        + "Inbound/AmbientMaterialClassification/part*"
    )
except Exception as e:
    print(f"Error in loading AmbientMaterialClassification data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")

material_classification_1 = material_classification.select(
    "MaterialID", "IsForWalmartAndPublix"
)

try:
    nutrition_product = spark.read.format("delta").load(
        "abfss://root@"
        + stgAct
        + ".dfs.core.windows.net"
        + folder_path
        + "Inbound/AmbientNutritionMaterial"
    )
except Exception as e:
    print(f"Error in loading AmbientNutritionMaterial data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")

In [0]:
# window for calculating the cumalative demand till current date
Windowval1 = (
    Window.partitionBy("MaterialID", "Location")
    .orderBy("Date")
    .rowsBetween(Window.unboundedPreceding, 0)
)

# window for taking minimum of demand of next EXCESS_DEMAND_CHECK_DAYS days
Windowval2 = (
    Window.partitionBy("Location", "MaterialID")
    .orderBy("Date")
    .rowsBetween(0, EXCESS_DEMAND_CHECK_DAYS)
)

# making some concat of columns so those can be used as a key for varioues calculations
ExcessDemand = (
    result.withColumn("dest_sku", sf.concat("Location", sf.lit("_"), "MaterialID"))
    .withColumn(
        "dest_sku_date",
        sf.concat("Location", sf.lit("_"), "MaterialID", sf.lit("_"), "Date"),
    )
    .withColumn("Cum_LocalDemand", sf.sum("LocalDemand").over(Windowval1))
    .withColumn("ExcessDemand_backup", sf.col("ExcessDemand"))
    .withColumn("Min_Of_Upcoming_Days", sf.min("ExcessDemand").over(Windowval2))
    .fillna(0, ["Min_Of_Upcoming_Days"])
    .withColumnRenamed("Min_Of_Upcoming_Days", "Min_Of_Upcoming_Days")
)

#Possible Combination

In [0]:
# making possible Excess inv - Excess Demand comb. where one material's excess inv at one location can be used to fullfill other location's excess demand

dc_comb = ExcessInventory.join(
    ExcessDemand.select(
        "MaterialID", "Location", "Date", "ExcessDemand", "Min_Of_Upcoming_Days"
    ),
    (
        (ExcessInventory["MaterialID"] == ExcessDemand["MaterialID"])
        & (ExcessInventory["SalvageDate"] == sf.date_add(ExcessDemand["Date"], 1))
    ),
    "left",
).select(
    ExcessInventory["*"],
    ExcessDemand.Location.alias("Destination"),
    ExcessDemand.Date,
    ExcessDemand.ExcessDemand,
    ExcessDemand.Min_Of_Upcoming_Days,
)
dc_comb = dc_comb.withColumnRenamed("Location", "Source").filter(
    (sf.col("Source") != sf.col("Destination"))
    & (sf.col("ExcessDemand") > 0)
    & (sf.col("Destination").isNotNull())
)

In [0]:
#Ambient Filters
# applying som DC - sku limitations on the combinations
dc_comb_1 = dc_comb.join(material_classification_1, ["MaterialID"], "left").join(
    nutrition_product, ["MaterialID"], "left"
)

# filtering out combination where non starbucks are going to StarbucksSolutionsLabs
dc_comb_1 = (
    dc_comb_1.filter(
        (
            (~sf.col("Destination").isin(StarbucksSolutionsLabs))
            | (sf.col("Division") == "STARBUCKS")
        )
    )
    .filter(
        (
            (~sf.col("Destination").isin(OnlyEvaporatedMilkDC))
            | (sf.col("ProductLevel3CategoryID").isin(PL3_for_EvaporatedMilk))
        )
    )  # filtering out combination where non EvaporatedMilk are going to OnlyEvaporatedMilkDC
    .filter(
        (
            (~sf.col("Destination").isin(OnlyNutritionDC))
            | (sf.col("IsNutrition") == "Y")
        )
    )  # filtering out combination where non Nutrition are going to OnlyNutritionDC
    .filter(
        (
            (~sf.col("Destination").isin(OnlyWalmartAndPublixDC))
            | (sf.col("IsForWalmartAndPublix") == "Y")
        )
    )  # filtering out combination where non Nutrition are going to OnlyWalmartAndPublixDC
)


# filtering out combination for sweet_earth_materials_only(thes DCs can only have Sweet materials, and inv will not come out)
dc_comb_1 = dc_comb_1.filter(
    (
        (~sf.col("Destination").isin(OnlySweetEarthMaterialsDC))
        | (sf.col("Division") == "SWEET EARTH")
    )
).filter(
    ~(
        (sf.col("Source").isin(OnlySweetEarthMaterialsDC))
        & (sf.col("Division") == "SWEET EARTH")
    )
)

# Frozen filter
# filtering out combination for pizza only DCs and Hotpockets
dc_comb_1 = dc_comb_1.filter(
    ((~sf.col("Destination").isin(OnlyPizzaDC)) | (sf.col("Division") == "PIZZA"))
).filter(
    (
        (~sf.col("Destination").isin(OnlyHotPocketDC))
        | (
            (sf.col("ProductLevel3CategoryID").isin(PL3_for_HotPockets))
            | (sf.col("MaterialID").isin(SKU_for_HotPockets))
        )
    )
)

# Chilled filter
dc_comb_1 = dc_comb_1.filter(~sf.col("Source").isin(EXCLUDE_SOURCE_CHILLED))

# Filtering Possible STR based on the Exclude Source/Destination List passed by PowerApps
dc_comb_1 = (
    dc_comb_1.filter(~sf.col("Source").isin(DCExcFromSource))
    .filter(~sf.col("Destination").isin(DCExcFromDestination))
    .filter(
        ~sf.col("MaterialID").isin(SKUExclusionList)
    )  # Filtering out materials that should not be moved
)

In [0]:
# adding the shipping cost, distance and Leadtime to the combinations
dc_comb_2 = dc_comb_1.join(ShippingCost, ["Source", "Destination"], "left").select(
    dc_comb_1["*"],
    ShippingCost["Distance"],
    ShippingCost["ShippingCost"],
    ShippingCost["leadtime"].cast("int").alias("LeadTime"),
)

##dispatch date

In [0]:
# creating a list of possible dispathch date as per the given variables. Dispatch date can not be on a weekend
today = datetime.strptime(dateformat, "%Y/%m/%d").date()
DATE = []
for i in range(BufferDaysForDispatch, dispatch_date_horizon):
    DATE.append([today + timedelta(i)])
dispatchDate = spark.createDataFrame(DATE, ["Date"])
dispatchDate = dispatchDate.withColumnRenamed("Date", "dispatchDate").filter(
    sf.dayofweek("dispatchDate").isin([2, 3, 4, 5, 6])
)  # only including weekdays

In [0]:
# adding possible deispathc date to all combinations and calculating arrival date based on the lead time and dispatch date
# also arrival date should be 2 days before the salvage date of the inv
dc_dates_comb = (
    dc_comb_2.join(dispatchDate)
    .withColumn("arrivaldate", sf.col("dispatchDate") + sf.col("LeadTime"))
    .filter(sf.col("arrivaldate") <= sf.date_add(sf.col("Date"), -2))
    .filter(sf.col("dispatchDate") > sf.date_add(sf.lit(ModelRunDate), 2))
)

In [0]:
# calculating cum demand for salvage date and the arrival date, so the demand between these two dates can be calculated and utilized for the excess inv
ExcessDemand_temp = ExcessDemand.select(
    "MaterialID",
    ExcessDemand.Location.alias("Destination"),
    sf.date_add(ExcessDemand["Date"], 1).alias("SalvageDate"),
    "Cum_LocalDemand",
)

AllComb = dc_dates_comb.join(
    ExcessDemand_temp, ["MaterialID", "Destination", "SalvageDate"], "left"
).select(
    dc_dates_comb["*"],
    ExcessDemand_temp.Cum_LocalDemand.alias("Cum_LocalDemandOnSalvageMinusOne"),
)

ExcessDemand_temp1 = ExcessDemand.select(
    "MaterialID",
    ExcessDemand.Location.alias("Destination"),
    sf.date_add(ExcessDemand["Date"], 0).alias("ArrivalDate"),
    "Cum_LocalDemand",
)

AllComb = AllComb.join(
    ExcessDemand_temp1, ["MaterialID", "Destination", "ArrivalDate"], "left"
).select(
    AllComb["*"],
    ExcessDemand_temp1.Cum_LocalDemand.alias("Cum_LocalDemandOnArrivalDate"),
)


AllComb = (
    AllComb.withColumn(
        "LocalDemandBetweenArrivalAndSalvage",
        sf.col("Cum_LocalDemandOnSalvageMinusOne")
        - sf.col("Cum_LocalDemandOnArrivalDate"),
    )
    .withColumn(
        "IntermediateDemandColumn",
        sf.greatest(
            sf.col("ExcessDemand") - sf.col("LocalDemandBetweenArrivalAndSalvage"),
            sf.lit(0),
        ),
    )
    .withColumn(
        "ActualExcessDemand",
        sf.col("ExcessDemand") - sf.col("IntermediateDemandColumn"),
    )
)

In [0]:
# creating various key column by concating multiple columns
AllComb = (
    AllComb.withColumn(
        "source_dest_sku_salvagedate",
        sf.concat(
            "Source",
            sf.lit("_"),
            "Destination",
            sf.lit("_"),
            "MaterialID",
            sf.lit("_"),
            "SalvageDate",
        ),
    )
    .withColumn(
        "source_sku_salvagedate",
        sf.concat("Source", sf.lit("_"), "MaterialID", sf.lit("_"), "SalvageDate"),
    )
    .withColumn(
        "source_sku_salvagedate_date",
        sf.concat(
            "Source",
            sf.lit("_"),
            "MaterialID",
            sf.lit("_"),
            "SalvageDate",
            sf.lit("_"),
            "SalvageDate",
        ),
    )
    .withColumn(
        "source_dest_sku_salvagedate_arrivaldate",
        sf.concat(
            "Source",
            sf.lit("_"),
            "Destination",
            sf.lit("_"),
            "MaterialID",
            sf.lit("_"),
            "SalvageDate",
            sf.lit("_"),
            "arrivaldate",
        ),
    )
    .withColumn(
        "source_dest_arrivaldate",
        sf.concat("Source", sf.lit("_"), "Destination", sf.lit("_"), "arrivaldate"),
    )
    .withColumn(
        "source_dest_dispatchDate",
        sf.concat("Source", sf.lit("_"), "Destination", sf.lit("_"), "dispatchDate"),
    )
    .withColumn("dest_sku", sf.concat("Destination", sf.lit("_"), "MaterialID"))
    .withColumn(
        "dest_sku_date",
        sf.concat("Destination", sf.lit("_"), "MaterialID", sf.lit("_"), "Date"),
    )
    .withColumn(
        "dest_sku_date_arrivaldate",
        sf.concat(
            "Destination",
            sf.lit("_"),
            "MaterialID",
            sf.lit("_"),
            "Date",
            sf.lit("_"),
            "arrivaldate",
        ),
    )
)

In [0]:
# calculating per unit cost and per unit shipping cost so, these can be used as the objective function of the solver
AllComb = AllComb.withColumn(
    "CostPerUOM", sf.coalesce("Price", "ProductStandardCost", "CostofGoodsSold")
).withColumn(
    "ShippingCostFactor", -sf.col("PUMToWeightFactor") * sf.col("ShippingCost") / 42000
)

# calculating SalvageDateFactor and ArrivalDateFactor, such that model recommend the earlier salvaging inv first and it also prefer later salvage date
AllComb = AllComb.withColumn(
    "SalvageDateFactor",
    salvage_date_horizon / (sf.datediff(sf.col("SalvageDate"), sf.lit(ModelRunDate))),
).withColumn(
    "ArrivalDateFactor",
    (sf.datediff(sf.col("arrivaldate"), sf.lit(ModelRunDate))) / salvage_date_horizon,
)

# objective factor for the Solver
AllComb = AllComb.withColumn(
    "OBJ",
    (sf.col("CostPerUOM") + sf.col("ShippingCostFactor")) * sf.col("SalvageDateFactor"),
)

In [0]:
# Add a new column 'Report_Run_Date' and Set values to the current date and then extract to Year, Month, Day
AllComb = (
    AllComb.withColumn("Report_Run_Date", sf.current_date())
    .withColumn("Year", sf.substring("Report_Run_Date", 1, 4))
    .withColumn("Month", sf.substring("Report_Run_Date", 6, 2))
    .withColumn("Day", sf.substring("Report_Run_Date", 9, 2))
)

try:
    AllComb_path = destination_path + "AllPossibleCombinations"
    AllComb_delta_table = DeltaTable.forPath(spark, AllComb_path)
    AllComb_delta_table.delete(sf.col("Report_Run_Date") == ModelRunDate)
except:
    print("table does not exist")
# Save the DataFrame in file on ADLS
AllComb.write.format("delta")\
    .mode('append')\
    .option("overwriteSchema", "true")\
    .option("mergeSchema", "true")\
    .partitionBy("Year","Month","Day")\
    .save(destination_path+'AllPossibleCombinations')

In [0]:
try:
    AllComb = spark.read.format("delta").load(
        destination_path + "AllPossibleCombinations"
    )
except Exception as e:
    print(f"Error in loading AllPossibleCombinations data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")
AllComb = AllComb.filter(sf.col("Report_Run_Date") == ModelRunDate)

# Call Optimizer Run

In [0]:
total_comb_count = AllComb.count()
print(total_comb_count)

In [0]:
# if there is already existing data in the SalvageDateLevelOutput folder, this will delete this, so there will be no duplicates for the day
try:
    SalvageDateLevelOutput_path = destination_path+'SalvageDateLevelOutput'
    SalvageDateLevelOutput_delta_table = DeltaTable.forPath(spark, SalvageDateLevelOutput_path)
    SalvageDateLevelOutput_delta_table.delete(sf.col('Report_Run_Date') == ModelRunDate)
except:
    print('table not exist')

In [0]:
# this cmd calls a notebook which will run an optimizer.
# the data size is more so, it is distributed in multiple parts of 5000
from pytz import timezone

increment_value = 5000
start = datetime.now(timezone("Asia/Kolkata"))
print("start time", start)
i = 0
while True:
    if i > total_comb_count:
        print("Run Completed")
        break
    result = dbutils.notebook.run(
        "./Model_Optimizer",
        4000,
        {
            "partition_path": str(i),
            "increment_value": str(increment_value),
            "destination_path": destination_path,
            "dateformat": dateformat,
        },
    )
    print(i, result)
    i += increment_value

end = datetime.now(timezone("Asia/Kolkata"))
print("end time", end)
print("Time Taken = ", end - start)

# Distribution on batch level

In [0]:
try:
    STO_qty = spark.read.format("delta").load(
        destination_path + "SalvageDateLevelOutput"
    )
except Exception as e:
    print(f"Error in loading SalvageDateLevelOutput data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")
STO_qty = STO_qty.filter(sf.col("Report_Run_Date") == ModelRunDate)

In [0]:
# THE STO_qty contains recommendations between two dcs on parent material and salvagedate level. Now the second optimization will be run to distribute the recommended qty on child material and batch level
batchinv = AtRiskandSalvageInventory.select(
    sf.col("ParentMaterialID").alias("MaterialID"),
    "Location",
    "Batch",
    "SalvageDate",
    "QuantityinUoM",
    sf.col("MaterialID").alias("ChildMaterialID"),
).withColumn(
    "source_sku_batch_salvagedate_child",
    sf.concat(
        sf.col("Location"),
        sf.lit("_"),
        sf.col("MaterialID"),
        sf.lit("_"),
        sf.col("Batch"),
        sf.lit("_"),
        sf.col("SalvageDate"),
        sf.lit("_"),
        sf.col("ChildMaterialID"),
    ),
)

sku_salvage_output = STO_qty.select(
    "MaterialID",
    STO_qty.Source.alias("Location"),
    "SalvageDate",
    STO_qty.Destination.alias("DestinationDC"),
    "ArrivalDate",
    "QtyMoved",
).withColumn(
    "source_dest_sku_salvagedate_arrivaldate",
    sf.concat(
        sf.col("Location"),
        sf.lit("_"),
        sf.col("DestinationDC"),
        sf.lit("_"),
        sf.col("MaterialID"),
        sf.lit("_"),
        sf.col("SalvageDate"),
        sf.lit("_"),
        sf.col("ArrivalDate"),
    ),
)

In [0]:
# there will be multiple combinations for One parent material and Salvage date (as multiple batch can have same salvage date)
Combinations = sku_salvage_output.join(
    batchinv, ["Location", "MaterialID", "SalvageDate"], "left"
).withColumn(
    "source_dest_sku_batch_salvagedate_arrivaldate_child",
    sf.concat(
        sf.col("Location"),
        sf.lit("_"),
        sf.col("DestinationDC"),
        sf.lit("_"),
        sf.col("MaterialID"),
        sf.lit("_"),
        sf.col("Batch"),
        sf.lit("_"),
        sf.col("SalvageDate"),
        sf.lit("_"),
        sf.col("ArrivalDate"),
        sf.lit("_"),
        sf.col("ChildMaterialID"),
    ),
)

In [0]:
# creating some dictionaries based on the key columns
source_dest_sku_batch_salvagedate_arrivaldate_child_list = (
    Combinations.select("source_dest_sku_batch_salvagedate_arrivaldate_child")
    .rdd.flatMap(lambda x: x)
    .collect()
)

movedqty = sku_salvage_output.select(
    "source_dest_sku_salvagedate_arrivaldate", "QtyMoved"
).rdd.collectAsMap()

batchmaxqty = batchinv.select(
    "source_sku_batch_salvagedate_child", "QuantityinUoM"
).rdd.collectAsMap()

###Decision variables

In [0]:
!pip install pulp

Looking in indexes: https://pypi.org/simple, https://****@pkgs.dev.azure.com/nestle-it/_packaging/nusa-python-package/pypi/simple/
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-50452272-aa1b-47f4-aa8b-3a7211405b38/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
from pulp import *

# creating variable for the optimizer
# 1. first varible will tell if any batch for any perticular source dest, material, salvagedate, arrival date and child material is selected to send or not
# 2. second variable is the qty for selected batch
source_dest_sku_batch_salvagedate_arrivaldate_child_activation = LpVariable.dicts(
    "batchactivation",
    source_dest_sku_batch_salvagedate_arrivaldate_child_list,
    cat="binary",
    lowBound=0,
)
source_dest_sku_batch_salvagedate_arrivaldate_child_qty = LpVariable.dicts(
    "batchqty",
    source_dest_sku_batch_salvagedate_arrivaldate_child_list,
    cat="Integer",
    lowBound=0,
)

In [0]:
# defining the objective function which is maximum recommended qty with minimum distinct batch
objective_value1 = LpProblem("cost", LpMinimize)

totalbatchcount = lpSum(
    source_dest_sku_batch_salvagedate_arrivaldate_child_activation[row]
    for row in source_dest_sku_batch_salvagedate_arrivaldate_child_list
)
totalqty = lpSum(
    source_dest_sku_batch_salvagedate_arrivaldate_child_qty[row]
    for row in source_dest_sku_batch_salvagedate_arrivaldate_child_list
)

objective_value1 += totalbatchcount - totalqty

In [0]:
# calculating the qty on source_dest_sku_salvagedate_arrivaldate and source_sku_batch_salvagedate_child_qty level using the qty variable
source_dest_sku_salvagedate_arrivaldate_qty = {}
source_sku_batch_salvagedate_child_qty = {}
for row in source_dest_sku_batch_salvagedate_arrivaldate_child_qty:
    source, dest, sku, batch, salvagedate, arrivaldate, child = row.split("_")
    source_dest_sku_salvagedate_arrivaldate_temp = (
        source + "_" + dest + "_" + sku + "_" + salvagedate + "_" + arrivaldate
    )
    source_sku_batch_salvagedate_child_temp = (
        source + "_" + sku + "_" + batch + "_" + salvagedate + "_" + child
    )

    if (
        source_dest_sku_salvagedate_arrivaldate_temp
        in source_dest_sku_salvagedate_arrivaldate_qty
    ):
        source_dest_sku_salvagedate_arrivaldate_qty[
            source_dest_sku_salvagedate_arrivaldate_temp
        ] += source_dest_sku_batch_salvagedate_arrivaldate_child_qty[row]
    else:
        source_dest_sku_salvagedate_arrivaldate_qty[
            source_dest_sku_salvagedate_arrivaldate_temp
        ] = source_dest_sku_batch_salvagedate_arrivaldate_child_qty[row]

    if (
        source_sku_batch_salvagedate_child_temp
        in source_sku_batch_salvagedate_child_qty
    ):
        source_sku_batch_salvagedate_child_qty[
            source_sku_batch_salvagedate_child_temp
        ] += source_dest_sku_batch_salvagedate_arrivaldate_child_qty[row]
    else:
        source_sku_batch_salvagedate_child_qty[
            source_sku_batch_salvagedate_child_temp
        ] = source_dest_sku_batch_salvagedate_arrivaldate_child_qty[row]

In [0]:
# applying constraint for the sum of splitted qty on batch level and Qty on salvage date level
for row in movedqty.keys():
    objective_value1 += (
        source_dest_sku_salvagedate_arrivaldate_qty[row] <= movedqty[row]
    )
# applying constraint for the sum of selected qty of a batch and its available excess inv
for row in source_sku_batch_salvagedate_child_qty.keys():
    objective_value1 += source_sku_batch_salvagedate_child_qty[row] <= batchmaxqty[row]

In [0]:
# creating a relationship between both variable. if the activation variable(1st) is 0 then the qty will also be 0, if it 1 only then the qty can be > 0
for row in source_dest_sku_batch_salvagedate_arrivaldate_child_qty:
    source, dest, sku, batch, salvagedate, arrivaldate, child = row.split("_")
    source_sku_batch_salvagedate_child_temp = (
        source + "_" + sku + "_" + batch + "_" + salvagedate + "_" + child
    )

    objective_value1 += (
        source_dest_sku_batch_salvagedate_arrivaldate_child_qty[row]
        <= source_dest_sku_batch_salvagedate_arrivaldate_child_activation[row]
        * batchmaxqty[source_sku_batch_salvagedate_child_temp]
    )

In [0]:
print(len(objective_value1.constraints))
print(len(objective_value1.variables()))

1204
1120


###solver

In [0]:
# running the solver. if the status of the solver is Infeasible, it will raise an error
objective_value1.solve(PULP_CBC_CMD(gapRel=0.05))
print(LpStatus[objective_value1.status])
print(value(objective_value1.objective))

if LpStatus[objective_value1.status] == "Infeasible":
    raise Exception("Infeasible Solution : The Optimizer is not able to find the solution with given constraints and input data")

Optimal
-130842.9293377023


In [0]:
# now we have the result, so we will convert it to a dataframe and will create the final output

model_output1 = spark.createDataFrame(
    [
        (row.name, row.value())
        for row in objective_value1.variables()
        if row.value() > 0
    ],
    ["Decision Variable", "Value"],
)

STO_batch_qty = model_output1.filter(
    model_output1["Decision Variable"].contains("batchqty_")
)
STO_batch_qty = (
    STO_batch_qty.withColumn(
        "SourceDC", sf.split(sf.col("Decision variable"), "\\_").getItem(1)
    )
    .withColumn(
        "DestinationDC", sf.split(sf.col("Decision variable"), "\\_").getItem(2)
    )
    .withColumn(
        "ParentMaterialID", sf.split(sf.col("Decision variable"), "\\_").getItem(3)
    )
    .withColumn("Batch", sf.split(sf.col("Decision variable"), "\\_").getItem(4))
    .withColumn(
        "SalvageDate",
        sf.concat(
            sf.split(sf.col("Decision variable"), "\\_").getItem(5),
            sf.lit("-"),
            sf.split(sf.col("Decision variable"), "\\_").getItem(6),
            sf.lit("-"),
            sf.split(sf.col("Decision variable"), "\\_").getItem(7),
        ).cast("Date"),
    )
    .withColumn(
        "ArrivalDate",
        sf.concat(
            sf.split(sf.col("Decision variable"), "\\_").getItem(8),
            sf.lit("-"),
            sf.split(sf.col("Decision variable"), "\\_").getItem(9),
            sf.lit("-"),
            sf.split(sf.col("Decision variable"), "\\_").getItem(10),
        ).cast("Date"),
    )
    .withColumn("MaterialID", sf.split(sf.col("Decision variable"), "\\_").getItem(11))
    .withColumnRenamed("Value", "QtyMoved")
)

In [0]:
# joining the recommended batch with atrisk salvage inv table so we can have all the info in final output
# some other columns are also calculated here
inv_movement = AtRiskandSalvageInventory.join(
    STO_batch_qty,
    (
        (AtRiskandSalvageInventory["Location"] == STO_batch_qty["SourceDC"])
        & (AtRiskandSalvageInventory["MaterialID"] == STO_batch_qty["MaterialID"])
        & (AtRiskandSalvageInventory["Batch"] == STO_batch_qty["Batch"])
    ),
    "left",
).select(
    AtRiskandSalvageInventory["*"],
    STO_batch_qty.SourceDC,
    STO_batch_qty.DestinationDC,
    STO_batch_qty.QtyMoved,
    STO_batch_qty.ArrivalDate,
)


inv_movement = (
    inv_movement.withColumn(
        "QtyMovedInPallet", sf.col("QtyMoved") / sf.col("ProductPlanningUnitsPerPallet")
    )
    .withColumn(
        "QtyMovedInCase", sf.col("QtyMoved") / sf.col("ProductPlanningUnitsPerCase")
    )
    .withColumn(
        "MovedValue",
        sf.col("QtyMoved")
        * sf.coalesce("Price", "ProductStandardCost", "CostofGoodsSold"),
    )
    .filter(sf.col("DestinationDC").isNotNull())
    .withColumn(
        "CostPerUOM", sf.coalesce("Price", "ProductStandardCost", "CostofGoodsSold")
    )
    .withColumn("MovedWeight", sf.col("QtyMoved") * sf.col("ProductSalesGrossWeight"))
    .withColumn(
        "MovedFootprint", sf.col("QtyMovedInPallet") / sf.col("StackabilityFactor")
    )
    # calculating the weight and footprint utilzation considering the maximum capacity of weight = 42000 and footprint = 30
    .withColumn("WeightUtilizationPerc", sf.col("MovedWeight") / 42000)
    .withColumn("FootprintUtilizationPerc", sf.col("MovedFootprint") / 30)
)

In [0]:
# calculating the shipping cost, distance and leadtime for the recommendations
inv_movement = inv_movement.join(
    ShippingCost,
    (inv_movement["Location"] == ShippingCost["Source"])
    & (inv_movement["DestinationDC"] == ShippingCost["Destination"]),
    "left",
).select(
    inv_movement["*"],
    ShippingCost["Distance"],
    ShippingCost["ShippingCost"].alias("StandardShippingCost"),
    ShippingCost["leadtime"].cast("int").alias("LeadTime"),
)

inv_movement = inv_movement.withColumn(
    "DispatchDate", sf.col("ArrivalDate") - sf.col("LeadTime")
).withColumn(
    "ShippingCost",
    sf.col("StandardShippingCost")
    * sf.greatest(sf.col("WeightUtilizationPerc"), sf.col("FootprintUtilizationPerc")),
)

In [0]:
try:
    APO_Data_final = spark.read.format("delta").load(input_path + "APO_Data")
except Exception as e:
    print(f"Error in loading APO_Data data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")
APO_Data_final = APO_Data_final.filter(sf.col("Report_Run_Date") == ModelRunDate)

# reading the apo data and calculating the average future demand as the average of all upcoming days. this avg demand will be used to calculate DOH after the qty movement
if Scenerio == "Scenerio1":
    APO_Data_final = APO_Data_final.withColumn(
        "avg_future_demand",
        sf.avg(
            sf.col("CDP")
            + sf.col("Dep_Dem")
            + sf.col("MTO")
            + sf.col("Sales")
            + sf.col("NDR")
            + sf.col("ODP")
            + sf.col("OLP")
        ).over(
            Window.partitionBy("Location", "Product")
            .orderBy("Date")
            .rowsBetween(0, Window.unboundedFollowing)
        ),
    )
elif Scenerio == "Scenerio2":
    APO_Data_final = APO_Data_final.withColumn(
        "avg_future_demand",
        sf.avg(sf.col("CDP") + sf.col("Dep_Dem") + sf.col("Sales")).over(
            Window.partitionBy("Location", "Product")
            .orderBy("Date")
            .rowsBetween(0, Window.unboundedFollowing)
        ),
    )

In [0]:
# joining with APO data so the demand between arrival date of the recommended and the salvage date of the recommended batch can be displayed there
batch_level_output = inv_movement.join(
    APO_Data_final,
    (
        (inv_movement["MaterialID"] == APO_Data_final["Product"])
        & (inv_movement["DestinationDC"] == APO_Data_final["Location"])
        & (inv_movement["SalvageDate"] > APO_Data_final["Date"])
        & (inv_movement["ArrivalDate"] < APO_Data_final["Date"])
    ),
    "left",
).select(
    inv_movement["*"],
    APO_Data_final.Ref_Dem,
    APO_Data_final.CDP,
    APO_Data_final.Dep_Dem,
    APO_Data_final.MTO,
    APO_Data_final.Sales,
    APO_Data_final.NDR,
    APO_Data_final.ODP,
    APO_Data_final.OLP,
    APO_Data_final.ASAP_Demand,
)

batch_level_output = batch_level_output.groupby(inv_movement.columns).agg(
    sf.sum("Ref_Dem").alias("Ref_Dem"),
    sf.sum("CDP").alias("CDP"),
    sf.sum("Dep_Dem").alias("Dep_Dem"),
    sf.sum("MTO").alias("MTO"),
    sf.sum("Sales").alias("Sales"),
    sf.sum("NDR").alias("NDR"),
    sf.sum("ODP").alias("ODP"),
    sf.sum("OLP").alias("OLP"),
    sf.sum("ASAP_Demand").alias("ASAP_Demand"),
)

# calculating the actual DOH, which will be for the earliest date in the APO data
days_on_hand = APO_Data_final.groupby("Product", "Location").agg(
    sf.min("Date").alias("Date")
)
days_on_hand = APO_Data_final.select("Product", "Location", "Date", "Cover").join(
    days_on_hand, ["Product", "Location", "Date"]
)

# for the DOH at source actual DOH will be used
days_on_hand_source = days_on_hand.select(
    sf.col("Location").alias("SourceDC"),
    sf.col("Product").alias("MaterialID"),
    sf.col("Cover").alias("DOHAtSource"),
)

# for the Destination DOH on the arrival Date will be used. and also the DOH after move will be calculated using closing Stock , recommended Qty and Avg Demand
days_on_hand_dest = APO_Data_final.select(
    sf.col("Location").alias("DestinationDC"),
    sf.col("Product").alias("MaterialID"),
    sf.col("Date").alias("ArrivalDate"),
    sf.col("Cover").alias("DOHAtDestination"),
    sf.col("Clos_Stock").alias("ClosingStockAtDestination"),
    sf.col("avg_future_demand").alias("avg_future_demand"),
)
batch_level_output = (
    batch_level_output.join(days_on_hand_source, ["MaterialID", "SourceDC"], "left")
    .join(days_on_hand_dest, ["MaterialID", "DestinationDC", "ArrivalDate"], "left")
    .withColumn(
        "DOHAtDest_with_movedqty",
        (sf.col("DOHAtDestination"))
        + (sf.col("QtyMoved") / sf.col("avg_future_demand")),
    )
)

In [0]:
# adding the maximum stock coverage Days or Maximum DOH column in the output
days_on_hand1 = APO_Data_final.groupby("Product", "Location").agg(
    sf.min("Date").alias("Date")
)
days_on_hand1 = APO_Data_final.select("Product", "Location", "Date", "MaxCover").join(
    days_on_hand1, ["Product", "Location", "Date"]
)
days_on_hand_dest1 = days_on_hand1.select(
    sf.col("Location").alias("DestinationDC"),
    sf.col("Product").alias("MaterialID"),
    sf.col("MaxCover").alias("Max_Stock_Cover_days_Destination"),
)
batch_level_output = batch_level_output.join(
    days_on_hand_dest1, ["MaterialID", "DestinationDC"], "left"
)

batch_level_output = (
    batch_level_output.withColumn(
        "MaxRecQty",
        (sf.col("Max_Stock_Cover_days_Destination") - sf.col("DOHAtDestination"))
        * sf.col("avg_future_demand"),
    )
    .withColumn(
        "MaxFullPalletRec",
        (sf.col("MaxRecQty") / sf.col("ProductPlanningUnitsPerPallet")),
    )
    .withColumn("MaxFullPalletRec", sf.greatest(sf.col("MaxFullPalletRec"), sf.lit(0)))
)

###Batch level Output

In [0]:
# Add a new column 'Week' to the DataFrame, extracting the week of the year from 'SalvageDate'
batch_level_output = (
    batch_level_output.withColumn("Week", sf.weekofyear("SalvageDate"))
    .withColumn("FullPalletMove", sf.round("QtyMovedInPallet"))
    .withColumn(
        "FullPalletRecommendation_Cases",
        sf.col("QtyMovedInCase")
        / (sf.col("QtyMovedInPallet"))
        * sf.col("FullPalletMove")
    )
)

In [0]:
# adding the region column for th esource and destination and creating the ISLocal Column, which will tell if any move is in the same region or it is in inter Region
Region = spark.read.format("delta").load(
    "abfss://root@"
    + stgAct
    + ".dfs.core.windows.net"
    + folder_path
    + "Inbound/NPWRegions"
)

batch_level_output = batch_level_output.withColumnRenamed("Region", "Region_old")
batch_level_output = (
    batch_level_output
    .join(Region, batch_level_output["SourceDC"] == Region["Location"], "left")
    .select(batch_level_output["*"], Region["Region"])
)

batch_level_output = batch_level_output.withColumn(
    "Region", sf.coalesce(sf.col("Region"), sf.col("Region_old"))
).drop(*["Region_old"])

dest_region = Region.select(
    sf.col("Location").alias("Destination_Location"),
    sf.col("Region").alias("Destination_Region"),
)
batch_level_output = (
    batch_level_output.join(
        dest_region,
        batch_level_output["DestinationDC"] == dest_region["Destination_Location"],
        "left",
    )
    .select(batch_level_output["*"], dest_region["Destination_Region"])
    .withColumn(
        "IsLocalMove",
        sf.when(
            sf.col("Region") == sf.col("Destination_Region"), sf.lit("Y")
        ).otherwise(sf.lit("N")),
    )
)

In [0]:
# Add a new column 'Report_Run_Date' and Set values to the current date and then extract to Year, Month, Day
batch_level_output = (
    batch_level_output.withColumn("Report_Run_Date", sf.current_date())
    .withColumn("Year", sf.substring("Report_Run_Date", 1, 4))
    .withColumn("Month", sf.substring("Report_Run_Date", 6, 2))
    .withColumn("Day", sf.substring("Report_Run_Date", 9, 2))
)

try:
    batch_level_output_path = destination_path + "BatchLevelOutput_inter"
    batch_level_output_delta_table = DeltaTable.forPath(spark, batch_level_output_path)
    batch_level_output_delta_table.delete(sf.col("Report_Run_Date") == ModelRunDate)
except:
    print("table does not exist")

# Save the DataFrame in file on ADLS
batch_level_output.write.format("delta").mode("append").option(
    "overwriteSchema", "true"
).option("mergeSchema", "true").partitionBy("Year", "Month", "Day").save(
    destination_path + "BatchLevelOutput_inter"
)

In [0]:
# filtering the recommendations based on the minimum recommended qty for a lane of 'Location','MaterialID','DispatchDate','DestinationDC'
# also filtering the recommendations for the CPM threshold
more_than_one_pallet = (
    batch_level_output.groupby(
        "Location", "MaterialID", "DispatchDate", "DestinationDC"
    )
    .agg(
        sf.sum("QtyMovedInPallet"),
        sf.sum("FullPalletMove"),
        sf.sum("FullPalletRecommendation_Cases"),
    )
    .filter(sf.col("sum(QtyMovedInPallet)") >= MinRecPalletQty)
)

batch_level_output1 = (
    batch_level_output.join(
        more_than_one_pallet,
        ["Location", "MaterialID", "DispatchDate", "DestinationDC"],
    )
    .select(batch_level_output.columns)
    .filter(col("ShippingCost") / col("MovedValue") <= CPMThreshold)
)

In [0]:
# creating the ISFTL column which will identify the orders which can be dispatched in the same week after clubbing together for the same lane and fullfill the FTL criteria
batch_level_output1 = batch_level_output1.withColumn(
    "DispatchDateYearWeek",
    (
        sf.when(
            ((sf.month("DispatchDate") == 12) & (sf.weekofyear("DispatchDate") == 1)),
            (sf.year("DispatchDate") + 1) * 100 + sf.weekofyear("DispatchDate"),
        ).otherwise(sf.year("DispatchDate") * 100 + sf.weekofyear("DispatchDate"))
    ),
)


batch_level_output_FTL2 = batch_level_output1.groupby(
    "SourceDC", "DispatchDateYearWeek", "DestinationDC"
).agg(
    sf.sum("MovedWeight").alias("CombinedMovedWeightOnLane"),
    sf.sum("MovedFootprint").alias("CombinedMovedFootprintOnLane"),
)
if Network == "Ambient":
    batch_level_output_FTL2 = batch_level_output_FTL2.withColumn(
        "IsFTL",
        sf.when(
            (sf.col("CombinedMovedWeightOnLane") >= 10000)
            | (sf.col("CombinedMovedFootprintOnLane") >= 10),
            sf.lit("Y"),
        ).otherwise(sf.lit("N")),
    )
elif Network == "Frozen":
    batch_level_output_FTL2 = batch_level_output_FTL2.withColumn(
        "IsFTL",
        sf.when(
            (sf.col("CombinedMovedWeightOnLane") >= 15000)
            | (sf.col("CombinedMovedFootprintOnLane") >= 25),
            sf.lit("Y"),
        ).otherwise(sf.lit("N")),
    )
elif Network == "Chilled":
    batch_level_output_FTL2 = batch_level_output_FTL2.withColumn(
        "IsFTL",
        sf.when(
            (sf.col("CombinedMovedWeightOnLane") >= 15000)
            | (sf.col("CombinedMovedFootprintOnLane") >= 25),
            sf.lit("Y"),
        ).otherwise(sf.lit("N")),
    )
elif Network == "NPW":
    batch_level_output_FTL2 = batch_level_output_FTL2.withColumn(
        "IsFTL",
        sf.when(
            (sf.col("CombinedMovedWeightOnLane") >= 15000)
            | (sf.col("CombinedMovedFootprintOnLane") >= 10),
            sf.lit("Y"),
        ).otherwise(sf.lit("N")),
    )


batch_level_output_FTL3 = batch_level_output_FTL2.withColumn(
    "TotalUtilizedWeight", (sf.col("CombinedMovedWeightOnLane") / 42000 * 100)
).withColumn(
    "TotalUtilizedFootprint", (sf.col("CombinedMovedFootprintOnLane") / 30 * 100)
)

batch_level_output1 = batch_level_output1.join(
    batch_level_output_FTL3,
    ["SourceDC", "DispatchDateYearWeek", "DestinationDC"],
    "left",
).withColumn(
    "FinalUtilization",
    (sf.greatest(sf.col("TotalUtilizedWeight"), sf.col("TotalUtilizedFootprint"))),
)

In [0]:
# Add a new column 'Report_Run_Date' and Set values to the current date
# Add a new columns 'Year' 'Month' 'Day' ,extracting from 'Report_Run_Date'
batch_level_output1 = batch_level_output1.withColumn('Report_Run_Date',sf.current_date()).withColumn('Year',sf.substring('Report_Run_Date',1,4)).withColumn('Month',sf.substring('Report_Run_Date',6,2)).withColumn('Day',sf.substring('Report_Run_Date',9,2))

try:
    # Specify the path of the Delta table
    batch_level_output1_path = destination_path+'BatchLevelOutput'
    # Get the DeltaTable instance for the specified path
    batch_level_output1_delta_table = DeltaTable.forPath(spark, batch_level_output1_path)
    # Delete records where 'Report_Run_Date' equals 'ModelRunDate'
    batch_level_output1_delta_table.delete(sf.col('Report_Run_Date') == ModelRunDate)
except:
    print('table does not exist')

# Save the DataFrame in file on ADLS
batch_level_output1.write.format("delta")\
    .mode('append')\
    .option("overwriteSchema", "true")\
    .option("mergeSchema", "true")\
    .partitionBy("Year","Month","Day")\
    .save(destination_path+'BatchLevelOutput')

### PBI write

In [0]:
# adding the bias type at source and destination for the material
try:
    BiasType = spark.read.parquet(
        "abfss://root@"
        + stgAct
        + ".dfs.core.windows.net"
        + pbi_folder_path
        + "PowerBI/BiasType"
    )
except Exception as e:
    print(f"Error in loading BiasType data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")

# select the required columns and Rename.
BiasType_at_source = BiasType.select(
    "MaterialID",
    sf.col("LocationID").alias("SourceDC"),
    sf.col("Bias").alias("BiasAtSource"),
    sf.col("BiasType").alias("BiasTypeAtSource"),
)
BiasType_at_dest = BiasType.select(
    "MaterialID",
    sf.col("LocationID").alias("DestinationDC"),
    sf.col("Bias").alias("BiasAtDestination"),
    sf.col("BiasType").alias("BiasTypeAtDestination"),
)

# join batch_level_output1 with BiasType_at_source & BiasType_at_dest
batch_level_output1 = batch_level_output1.join(
    BiasType_at_source, ["MaterialID", "SourceDC"], "left"
).join(BiasType_at_dest, ["MaterialID", "DestinationDC"], "left")

In [0]:
# calculating the recommended Out Qty for a source, it can be showed along with Total Excess Inv( total Excess and Action Taken on (recommendedQty))
recommended_qty = batch_level_output1.groupby(
    sf.col("SourceDC").alias("Location"), "Batch", "MaterialID"
).agg(sf.sum("QtyMoved").alias("RecommendedQty"))

# reading the Excess Inv table and saving it to PowerBi folder after adding some columns, so it can be used in PBI report
try:
    AtRiskandSalvageInventory_pbi = spark.read.format("delta").load(
        input_path + "AtRiskandSalvageInventory"
    )
except Exception as e:
    print(f"Error in loading AtRiskandSalvageInventory data: {str(e)}")
    raise SystemExit(f"Exiting due to the error: {str(e)}")

AtRiskandSalvageInventory_pbi = (
    AtRiskandSalvageInventory_pbi.filter(sf.col("Report_Run_Date") == ModelRunDate)
    .withColumn("QtyInCases", col("QuantityinUoM") / col("ProductPlanningUnitsPerCase"))
    .withColumn(
        "QtyinPallets", col("QuantityinUoM") / col("ProductPlanningUnitsPerPallet")
    )
    .withColumn("QuantityValue_COGS", col("QuantityinUoM") * col("CostofGoodsSold"))
    .withColumn(
        "QuantityValue_StdCost", col("QuantityinUoM") * col("ProductStandardCost")
    )
)


if Network == "NPW":
    AtRiskandSalvageInventory_pbi = AtRiskandSalvageInventory_pbi.withColumn(
        "QuantityValue_NPS", col("QuantityinUoM") * col("Price")
    )
else:
    AtRiskandSalvageInventory_pbi = AtRiskandSalvageInventory_pbi.withColumn(
        "QuantityValue_NPS", sf.lit(None).cast("double")
    )

AtRiskandSalvageInventory_pbi = (
    AtRiskandSalvageInventory_pbi.join(
        recommended_qty, ["Location", "Batch", "MaterialID"], "left"
    )
    .withColumn(
        "ActionTaken", sf.when(sf.col("RecommendedQty").isNotNull(), sf.lit("Y"))
    )
    .withColumn(
        "MatID-Desc",
        sf.concat(sf.col("MaterialID"), sf.lit(" - "), sf.col("ProductDescription")),
    )
    .withColumn(
        "SnapshotDate", sf.from_utc_timestamp(sf.lit(us_eastern_dt), target_timezone)
    )
)


AtRiskandSalvageInventory_pbi = AtRiskandSalvageInventory_pbi.drop(
    *[
        "AtRiskCategory",
        "Currency",
        "CostofGoodsSold",
        "ProductStandardCost",
        "UniversalBasePrice",
        "QuantityPrice",
        "TotalShelfLife",
        "ShelfLifeRemainingDays",
        "ShelfLifeRemainingPercent",
        "MinimumRemainingShelfLifeDays",
        "MinimumRemainingShelfLifePercent",
        "CasesPerLayer",
        "CasesPerPallet",
        "LayersPerPallet",
        "CaseWeight",
        "StorCond",
        "Subdivision",
        "Division",
        "ProductLevel3CategoryID",
        "ProductPlanningUnitsPerPallet",
        "ProductPlanningUnitsPerCase",
        "StackabilityFactor",
        "ProductSalesGrossWeight",
        "ProductWeightUnitOfMeasure",
        "ProductLevel3CategoryDescription",
        "Price",
    ]
)

# AtRiskandSalvageInventory_pbi saving ADLS
AtRiskandSalvageInventory_pbi.write.format("parquet").mode("overwrite").save(
    AtRiskAndSalvageInv_PBIPath
)

In [0]:
# saving the required columns in PBI folder for PBI report
batch_level_output_pbi = batch_level_output1.drop(
    *[
        "AtRiskCategory",
        "Location",
        "Currency",
        "UniversalBasePrice",
        "QuantityValue",
        "QuantityPrice",
        "DaystoSalvage",
        "WeekstoSalvage",
        "TotalShelfLife",
        "ShelfLifeRemainingDays",
        "ShelfLifeRemainingPercent",
        "MinimumRemainingShelfLifeDays",
        "CasesPerLayer",
        "CasesPerPallet",
        "LayersPerPallet",
        "CaseWeight",
        "ProductLevel3CategoryDescription",
        "Price",
        "Freshness",
        "CostPerUOM",
        "Distance",
        "StandardShippingCost",
        "LeatTime",
        "ASAP_Demand",
        "ClosingStockAtDestination",
        "avg_future_demand",
        "DualSKUNumber",
        "Region",
        "Destination_Region",
        "TotalUtilizedWeight",
        "TotalUtilizedFootprint",
        "ProductDescription",
        "CostofGoodsSold",
        "ProductStandardCost",
        "StorCond",
        "Subdivision",
        "Division",
        "ProductLevel3CategoryID",
        "ProductPlanningUnitsPerPallet",
        "ProductPlanningUnitsPerCase",
        "StackabilityFactor",
        "ProductSalesGrossWeight",
    ]
).withColumnRenamed("FullPalletRecommendation_Cases", "FullPalletRecommendation(Cases)")

# Add a new column 'SnapshotDate' based on 'us_eastern_dt' timestamp column
batch_level_output_pbi = batch_level_output_pbi.withColumn(
    "SnapshotDate", sf.from_utc_timestamp(sf.lit(us_eastern_dt), target_timezone)
)

# batch_level_output_pbi saving to ADLS
batch_level_output_pbi.write.format("parquet").mode("overwrite").save(
    BatchLevelOutput_PBIPath
)

In [0]:
# Check if Network is 'Chilled'
if Network == "Chilled":
    # Check if Scenerio is 'Scenerio1'
    if Scenerio == "Scenerio1":
        # Display the selected columns for batch_level_output1
        batch_level_output1.select(
            "Division",
            "ProductLevel3CategoryDescription",
            "MaterialID",
            "Batch",
            batch_level_output1["Location"].alias("SourceDC"),
            "SalvageDate",
            "Week",
            "DispatchDate",
            "DestinationDC",
            "ArrivalDate",
            batch_level_output1["QuantityinUoM"].alias("Excess Inv at Source"),
            "QtyMoved",
            "UoM",
            "QtyMovedInPallet",
            "QtyMovedInCase",
            "MovedValue",
            "FullPalletMove",
            "FullPalletRecommendation_Cases",
            "CDP",
            "Dep_Dem",
            "MTO",
            "Sales",
            "NDR",
            "ODP",
            "OLP",
            "ShippingCost",
            "MovedWeight",
            "MovedFootprint",
            "ProductWeightUnitOfMeasure",
            "ProductionDate",
            "AvailableDate",
            "ConsumerSLDate",
            "DOHAtSource",
            "DOHAtDestination",
            "Report_Run_Date",
        ).display()
    # Display the selected columns for batch_level_output1 (Scenerio other than 'Scenerio1')
    else:
        batch_level_output1.select(
            "Division",
            "ProductLevel3CategoryDescription",
            "MaterialID",
            "Batch",
            batch_level_output1["Location"].alias("SourceDC"),
            "SalvageDate",
            "Week",
            "DispatchDate",
            "DestinationDC",
            "ArrivalDate",
            batch_level_output1["QuantityinUoM"].alias("Excess Inv at Source"),
            "QtyMoved",
            "UoM",
            "QtyMovedInPallet",
            "QtyMovedInCase",
            "MovedValue",
            "FullPalletMove",
            "FullPalletRecommendation_Cases",
            "CDP",
            "Dep_Dem",
            "Sales",
            "ShippingCost",
            "MovedWeight",
            "MovedFootprint",
            "ProductWeightUnitOfMeasure",
            "ProductionDate",
            "AvailableDate",
            "ConsumerSLDate",
            "DOHAtSource",
            "DOHAtDestination",
            "Report_Run_Date",
        ).display()
else:
    # Check if Scenerio is 'Scenerio1'
    if Scenerio == "Scenerio1":
        # Display the selected columns for batch_level_output1
        batch_level_output1.select(
            "Division",
            "ProductLevel3CategoryDescription",
            "MaterialID",
            "Batch",
            batch_level_output1["Location"].alias("SourceDC"),
            "SalvageDate",
            "Week",
            "DispatchDate",
            "DestinationDC",
            "ArrivalDate",
            batch_level_output1["QuantityinUoM"].alias("Excess Inv at Source"),
            "QtyMoved",
            "UoM",
            "QtyMovedInPallet",
            "QtyMovedInCase",
            "MovedValue",
            "FullPalletMove",
            "FullPalletRecommendation_Cases",
            "CDP",
            "Dep_Dem",
            "MTO",
            "Sales",
            "NDR",
            "ODP",
            "OLP",
            "ShippingCost",
            "MovedWeight",
            "MovedFootprint",
            "ProductWeightUnitOfMeasure",
            "ProductionDate",
            "AvailableDate",
            "ConsumerSLDate",
            "DOHAtSource",
            "DOHAtDestination",
            "Report_Run_Date",
        ).display()
    else:
        # Display the selected columns for batch_level_output1 (Scenerio other than 'Scenerio1')
        batch_level_output1.select(
            "Division",
            "ProductLevel3CategoryDescription",
            "MaterialID",
            "Batch",
            batch_level_output1["Location"].alias("SourceDC"),
            "SalvageDate",
            "Week",
            "DispatchDate",
            "DestinationDC",
            "ArrivalDate",
            batch_level_output1["QuantityinUoM"].alias("Excess Inv at Source"),
            "QtyMoved",
            "UoM",
            "QtyMovedInPallet",
            "QtyMovedInCase",
            "MovedValue",
            "FullPalletMove",
            "FullPalletRecommendation_Cases",
            "CDP",
            "Dep_Dem",
            "Sales",
            "ShippingCost",
            "MovedWeight",
            "MovedFootprint",
            "ProductWeightUnitOfMeasure",
            "ProductionDate",
            "AvailableDate",
            "ConsumerSLDate",
            "DOHAtSource",
            "DOHAtDestination",
            "Report_Run_Date",
        ).display()

Division,ProductLevel3CategoryDescription,MaterialID,Batch,SourceDC,SalvageDate,Week,DispatchDate,DestinationDC,ArrivalDate,Excess Inv at Source,QtyMoved,UoM,QtyMovedInPallet,QtyMovedInCase,MovedValue,FullPalletMove,FullPalletRecommendation_Cases,CDP,Dep_Dem,Sales,ShippingCost,MovedWeight,MovedFootprint,ProductWeightUnitOfMeasure,ProductionDate,AvailableDate,ConsumerSLDate,DOHAtSource,DOHAtDestination,Report_Run_Date
PIZZA,US JKS PIZZA,12403930,4026525821,5282,2024-05-25,21,2024-03-15,5514,2024-03-16,684.0,684.0,CS,19.0,684.0,25975.400004000003,19.0,684.0,256718.6783,0.0,0.0,575.6999999999999,20314.8,19.0,LB,2024-01-26,2024-01-29,2024-08-23,999.0,65.2509,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3219544617,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,748.0,748.0,CS,20.77777777777778,748.0,25801.857576,21.0,756.0,660766.8592,0.0,0.0,1294.374819047619,19597.6,10.38888888888889,LB,2023-08-07,2023-08-14,2024-10-31,999.0,46.3684,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3235544617,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,677.0,677.0,CS,18.80555555555556,677.0,23352.750774,19.0,684.0,660766.8592,0.0,0.0,1171.513038095238,17737.399999999998,9.40277777777778,LB,2023-08-23,2023-08-30,2024-10-31,999.0,46.3684,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3221544614,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,1211.0,1211.0,CS,33.638888888888886,1211.0,41772.793482,34.0,1224.0,660766.8592,0.0,0.0,2095.572066666667,31728.2,16.819444444444443,LB,2023-08-09,2023-08-16,2024-10-31,999.0,46.3684,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3232544617,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,2564.0,2564.0,CS,71.22222222222223,2564.0,88443.80056799999,71.0,2556.0,660766.8592,0.0,0.0,4436.867695238095,67176.8,35.611111111111114,LB,2023-08-20,2023-08-27,2024-10-31,999.0,46.3684,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3216544614,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,1211.0,1211.0,CS,33.638888888888886,1211.0,41772.793482,34.0,1224.0,660766.8592,0.0,0.0,2095.572066666667,31728.2,16.819444444444443,LB,2023-08-04,2023-08-11,2024-10-31,999.0,46.3684,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3233544617,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,214.0,214.0,CS,5.944444444444445,214.0,7381.814867999999,6.0,216.0,660766.8592,0.0,0.0,370.3157904761905,5606.8,2.9722222222222223,LB,2023-08-21,2023-08-28,2024-10-31,999.0,46.3684,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3213544614,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,1424.0,1424.0,CS,39.55555555555556,1424.0,49120.113888,40.0,1440.0,660766.8592,0.0,0.0,2464.1574095238093,37308.8,19.77777777777778,LB,2023-08-01,2023-08-08,2024-10-31,999.0,46.3684,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3230544617,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,499.0,499.0,CS,13.86111111111111,499.0,17212.736538,14.0,504.0,660766.8592,0.0,0.0,863.4933619047619,13073.8,6.930555555555555,LB,2023-08-18,2023-08-25,2024-10-31,999.0,46.3684,2024-03-08
SNACKING,US HHFG - WAREHOUSE CLUB,12505148,3217544617,5275,2024-07-03,27,2024-04-24,5070,2024-04-27,321.0,321.0,CS,8.916666666666666,321.0,11072.722302,9.0,324.0,660766.8592,0.0,0.0,555.4736857142856,8410.199999999999,4.458333333333333,LB,2023-08-05,2023-08-12,2024-10-31,999.0,46.3684,2024-03-08
