In [1]:
#!pip install pyspark

In [2]:
import findspark

findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
#Successfully installed py4j-0.10.9.5 pyspark-3.3.2

In [5]:
# HADOOP_HOME = C:\Hadoop
# JAVA_HOME = C:\Java\jdk-11.0.6
# PYSPARK_DRIVER_PYTHON = jupyter
# PYSPARK_DRIVER_PYTHON_OPTS = notebook
# PYSPARK_PYTHON = python

In [6]:
spark=SparkSession.builder.getOrCreate()

In [7]:
spark

In [8]:
df_rw =spark.read.option('header','true').csv('Dataset//inventory_final_data.csv',inferSchema=True)

In [9]:
df_rw.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- sku: integer (nullable = true)
 |-- product_category: string (nullable = true)
 |-- total_sales_units: integer (nullable = true)
 |-- on_hand_inventory_units: double (nullable = true)
 |-- replenishment_units: integer (nullable = true)
 |-- inventory_pipeline: integer (nullable = true)
 |-- units_in_transit: integer (nullable = true)
 |-- units_in_dc: integer (nullable = true)
 |-- units_on_order: integer (nullable = true)
 |-- units_under_promotion: integer (nullable = true)
 |-- shelf_capacity: double (nullable = true)
 |-- promotion_flag: integer (nullable = true)
 |-- replenishment_flag: integer (nullable = true)



In [10]:
from pyspark.sql.functions import col, to_date

df_rw = df_rw.withColumn('date', to_date(col('Date')))

In [11]:
df_rw.printSchema()

root
 |-- date: date (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- sku: integer (nullable = true)
 |-- product_category: string (nullable = true)
 |-- total_sales_units: integer (nullable = true)
 |-- on_hand_inventory_units: double (nullable = true)
 |-- replenishment_units: integer (nullable = true)
 |-- inventory_pipeline: integer (nullable = true)
 |-- units_in_transit: integer (nullable = true)
 |-- units_in_dc: integer (nullable = true)
 |-- units_on_order: integer (nullable = true)
 |-- units_under_promotion: integer (nullable = true)
 |-- shelf_capacity: double (nullable = true)
 |-- promotion_flag: integer (nullable = true)
 |-- replenishment_flag: integer (nullable = true)



In [12]:
df_vendor =spark.read.csv('Dataset//Vendor_data.csv' ,header=True,inferSchema=True)

In [13]:
df_vendor.show()

+---+---------+-------------+--------+-------+---------------+--------------------+------------------+
|key|vendor_id|sub_vendor_id|store_id|item_id|LEAD_TIME_IN_DC|LEAD_TIME_IN_TRANSIT|LEAD_TIME_ON_ORDER|
+---+---------+-------------+--------+-------+---------------+--------------------+------------------+
|  1|        1|         1001|    1763|      1|              4|                   3|                 7|
|  2|        1|         1001|    1763|      2|              2|                   1|                 7|
|  3|        1|         1002|    1843|      2|              2|                   1|                 7|
|  4|        1|         1001|    1763|      3|              4|                   3|                 7|
|  5|        2|         2016|     486|      6|              2|                   1|                 8|
|  6|        2|         2073|    1587|      7|              3|                   2|                 9|
|  7|        2|         2087|    1556|      8|              4|           

# Step-2: Identify Phantom Inventory
#### Calculate Phantom Inventory

In [14]:
# !pip install pandas

In [15]:
import pyspark.sql.functions as f
from pyspark.sql.types import *
 
import pandas as pd

In [16]:

# phantom inventory calculations
phantom_inventory = (
  df_rw
    
    # average daily sales
    .withColumn('daily_sales_units', f.expr('AVG(total_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date)')) 
    
    # on-hand inventory units at the end of previous day
    # for dates with no prior day inventory units, provide alt calculation
    .withColumn('start_on_hand_units', f.expr('''
      COALESCE( 
        LAG(on_hand_inventory_units, 1) OVER(PARTITION BY store_id, sku ORDER BY date), 
        on_hand_inventory_units + total_sales_units - replenishment_units
        )
        ''')) 
    
    # on-hand inventory units at end of day
    .withColumn('end_on_hand_units', f.expr('COALESCE(on_hand_inventory_units, 0)')) 
    
    # calculate phantom inventory as difference in:
    # (previous day's on-hand inventory + current day's replenished units - current day's sales units) and current day's end-of-day inventory 
    .withColumn('phantom_inventory', f.expr('start_on_hand_units + replenishment_units - total_sales_units - end_on_hand_units')) 
    
    # flag only when phantom inventory is at least 5 times average daily sales
    .withColumn('phantom_inventory_ind', f.expr('''
      CASE
        WHEN phantom_inventory <> 0 AND ABS(phantom_inventory) > 5 * daily_sales_units THEN 1 
        ELSE 0 
        END'''))  
  
    .select(
      'date',
      'store_id',
      'sku',
      'daily_sales_units',
      'start_on_hand_units',
      'replenishment_units',
      'total_sales_units',
      'end_on_hand_units',
      'phantom_inventory',
      'phantom_inventory_ind'
      )
    )
 

In [17]:
phantom_inventory.show()

+----------+--------+---+------------------+-------------------+-------------------+-----------------+-----------------+-----------------+---------------------+
|      date|store_id|sku| daily_sales_units|start_on_hand_units|replenishment_units|total_sales_units|end_on_hand_units|phantom_inventory|phantom_inventory_ind|
+----------+--------+---+------------------+-------------------+-------------------+-----------------+-----------------+-----------------+---------------------+
|2019-01-01|      63| 57|               0.0|               null|                  0|                0|              0.0|             null|                    0|
|2019-01-02|      63| 57|               0.0|                8.0|                  0|                0|              8.0|              0.0|                    0|
|2019-01-03|      63| 57|               0.0|                8.0|                  0|                0|              8.0|              0.0|                    0|
|2019-01-04|      63| 57|         

## Step 3: Identify Out of Stocks
#### Join Inventory to Phantom Inventory

In [18]:

# combine inventory with phantom inventory and min lead times
inventory_with_pi = (
  df_rw.alias('inv')
    .join(phantom_inventory.alias('pi'), on=['store_id','sku','date'])
  
    # limit fields to use moving forward
    .selectExpr(
      'inv.store_id',
      'inv.sku',
      'inv.date',
      'inv.product_category',
      'inv.on_hand_inventory_units',
      'inv.total_sales_units',
      'inv.replenishment_units',
      'inv.replenishment_flag',
      'inv.units_on_order',
      'inv.units_in_transit',
      'inv.units_in_dc',
      'pi.phantom_inventory'
      )
  
  # correct inventory values to enable calculations
  .withColumn('phantom_inventory', f.expr('COALESCE(phantom_inventory, 0)')) 
  .withColumn('on_hand_inventory_units', f.expr('''
              CASE 
                WHEN on_hand_inventory_units < 0 THEN 0 
                ELSE on_hand_inventory_units 
                END''')
             )
   .withColumn('replenishment_units', f.expr('''
              CASE 
                WHEN replenishment_flag = 1 THEN replenishment_units
                ELSE 0 
                END''')
             )
  
  # initialize estimated on-hand inventory field
   .withColumn('estimated_on_hand_inventory', f.lit(0)) 
  )
 


In [19]:
inventory_with_pi.show()

+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+
|store_id|sku|      date|product_category|on_hand_inventory_units|total_sales_units|replenishment_units|replenishment_flag|units_on_order|units_in_transit|units_in_dc|phantom_inventory|estimated_on_hand_inventory|
+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+
|      63| 57|2019-01-01|     Category 04|                   null|                0|                  0|                 0|             0|               0|          0|              0.0|                          0|
|      63| 57|2019-01-02|     Category 04|                    8.0|                0|                  0|                 0|             8|      

In [20]:
inventory_with_pi.head(5)

[Row(store_id=63, sku=57, date=datetime.date(2019, 1, 1), product_category='Category 04', on_hand_inventory_units=None, total_sales_units=0, replenishment_units=0, replenishment_flag=0, units_on_order=0, units_in_transit=0, units_in_dc=0, phantom_inventory=0.0, estimated_on_hand_inventory=0),
 Row(store_id=63, sku=57, date=datetime.date(2019, 1, 2), product_category='Category 04', on_hand_inventory_units=8.0, total_sales_units=0, replenishment_units=0, replenishment_flag=0, units_on_order=8, units_in_transit=0, units_in_dc=0, phantom_inventory=0.0, estimated_on_hand_inventory=0),
 Row(store_id=63, sku=57, date=datetime.date(2019, 1, 3), product_category='Category 04', on_hand_inventory_units=8.0, total_sales_units=0, replenishment_units=0, replenishment_flag=0, units_on_order=0, units_in_transit=0, units_in_dc=0, phantom_inventory=0.0, estimated_on_hand_inventory=0),
 Row(store_id=63, sku=57, date=datetime.date(2019, 1, 4), product_category='Category 04', on_hand_inventory_units=7.0, t

## Estimate On-Hand Inventory

In [21]:
#!pip install pyarrow

In [22]:
#!conda install -c cyclus java-jdk

In [23]:

# iterate over inventory to calculate current inventory levels
def get_estimated_inventory(inventory_pd: pd.DataFrame) -> pd.DataFrame:
    inventory_pd.sort_values('date', inplace=True)
    # iterate over records in inventory data
    for i in range(1,len(inventory_pd)):
    
        # get component values
        previous_inv = inventory_pd.estimated_on_hand_inventory.iloc[i-1]
        if previous_inv < 0: previous_inv = 0

        replenishment_units = inventory_pd.replenishment_units.iloc[i]
        total_sales_units = inventory_pd.total_sales_units.iloc[i]
        phantom_inventory_units = inventory_pd.phantom_inventory.iloc[i]
        on_hand_inventory_units = inventory_pd.on_hand_inventory_units.iloc[i]

        # calculate estimated on-hand inventory
        estimated_on_hand_inventory = previous_inv + replenishment_units - total_sales_units - phantom_inventory_units
        if estimated_on_hand_inventory < 0: estimated_on_hand_inventory = 0
        if estimated_on_hand_inventory > on_hand_inventory_units: estimated_on_hand_inventory = on_hand_inventory_units

        inventory_pd.estimated_on_hand_inventory.iloc[i] = estimated_on_hand_inventory
    
    return inventory_pd

 


#### calculate estimated on-hand inventory

In [24]:
#!pip install pyarrow

In [25]:

inventory_on_hand = (
  inventory_with_pi
  .groupby('store_id', 'sku')
    .applyInPandas( get_estimated_inventory, schema=inventory_with_pi.schema )
  )
 


In [26]:
display(inventory_on_hand)

DataFrame[store_id: int, sku: int, date: date, product_category: string, on_hand_inventory_units: double, total_sales_units: int, replenishment_units: int, replenishment_flag: int, units_on_order: int, units_in_transit: int, units_in_dc: int, phantom_inventory: double, estimated_on_hand_inventory: int]

In [27]:
#-*- coding: utf-8 -*-

In [28]:
inventory_on_hand.show()

+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+
|store_id|sku|      date|product_category|on_hand_inventory_units|total_sales_units|replenishment_units|replenishment_flag|units_on_order|units_in_transit|units_in_dc|phantom_inventory|estimated_on_hand_inventory|
+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+
|      63| 57|2019-01-01|     Category 04|                   null|                0|                  0|                 0|             0|               0|          0|              0.0|                          0|
|      63| 57|2019-01-02|     Category 04|                    8.0|                0|                  0|                 0|             8|      

### Calculate Average On-Hand and Average Daily Sales Metrics

In [30]:
"""
inventory_with_metrics = (
  inventory_on_hand
    
    # AVERAGE ON-HAND UNITS PRIOR TO REPLENISHMENT
    # ------------------------------------------------------------------------------------
    # getting prior day's on-hand inventory only for the days with replenishment
    .withColumn('prior_inventory', f.expr('LAG(estimated_on_hand_inventory, 1) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('prior_inventory', f.expr('COALESCE(prior_inventory,0)'))
    .withColumn('prior_inventory', f.expr('CASE WHEN replenishment_flag=1 THEN prior_inventory ELSE 0 END'))
  
    # calculating rolling average of prior day's on-hand inventory for days with replenishment (over last 90 days)
    .withColumn('rolling_stock_onhand', f.expr('''
      SUM(prior_inventory) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW) /
      (SUM(replenishment_flag) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW) + 1)
      '''
      ))
    .withColumn('rolling_min_expected_stock', f.expr('CASE WHEN replenishment_flag != 1 THEN 0 ELSE rolling_stock_onhand END'))
    .withColumn('rolling_min_expected_stock', f.expr('COALESCE(rolling_min_expected_stock,0)'))
    
    # fixing the inventory values for all dates through forward fill
    .withColumn('min_expected_stock', f.expr('NULLIF(rolling_min_expected_stock,0)'))
    .withColumn('min_expected_stock', f.expr('LAST(min_expected_stock, True) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('min_expected_stock', f.expr('COALESCE(min_expected_stock, 0)'))
    # ------------------------------------------------------------------------------------
  
    # AVERAGE DAILY SALES
    # ------------------------------------------------------------------------------------
    # getting daily sales velocity
    .withColumn('daily_sales_units', f.expr('AVG(total_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW)'))
    .withColumn('daily_sales_units', f.expr('LAST(daily_sales_units, True) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('daily_sales_units', f.expr('COALESCE(daily_sales_units, 0)'))
    # ------------------------------------------------------------------------------------
  )
"""


"\ninventory_with_metrics = (\n  inventory_on_hand\n    \n    # AVERAGE ON-HAND UNITS PRIOR TO REPLENISHMENT\n    # ------------------------------------------------------------------------------------\n    # getting prior day's on-hand inventory only for the days with replenishment\n    .withColumn('prior_inventory', f.expr('LAG(estimated_on_hand_inventory, 1) OVER(PARTITION BY store_id, sku ORDER BY date)'))\n    .withColumn('prior_inventory', f.expr('COALESCE(prior_inventory,0)'))\n    .withColumn('prior_inventory', f.expr('CASE WHEN replenishment_flag=1 THEN prior_inventory ELSE 0 END'))\n  \n    # calculating rolling average of prior day's on-hand inventory for days with replenishment (over last 90 days)\n    .withColumn('rolling_stock_onhand', f.expr('''\n      SUM(prior_inventory) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW) /\n      (SUM(replenishment_flag) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CUR

In [31]:
inventory_with_metrics = (
  inventory_on_hand
    
    # AVERAGE ON-HAND UNITS PRIOR TO REPLENISHMENT
    # ------------------------------------------------------------------------------------
    # getting prior day's on-hand inventory only for the days with replenishment
    .withColumn('prior_inventory', f.expr('LAG(estimated_on_hand_inventory, 1) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('prior_inventory', f.expr('COALESCE(prior_inventory,0)'))
    .withColumn('prior_inventory', f.expr('CASE WHEN replenishment_flag=1 THEN prior_inventory ELSE 0 END'))
)

In [33]:
inventory_with_metrics = (
  inventory_with_metrics
    
    # calculating rolling average of prior day's on-hand inventory for days with replenishment (over last 90 days)
    .withColumn('rolling_stock_onhand', f.expr('''
      SUM(prior_inventory) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW) /
      (SUM(replenishment_flag) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW) + 1)
      '''
      ))
    .withColumn('rolling_min_expected_stock', f.expr('CASE WHEN replenishment_flag != 1 THEN 0 ELSE rolling_stock_onhand END'))
    .withColumn('rolling_min_expected_stock', f.expr('COALESCE(rolling_min_expected_stock,0)'))
)

In [34]:
inventory_with_metrics = (
  inventory_with_metrics
    
    # fixing the inventory values for all dates through forward fill
    .withColumn('min_expected_stock', f.expr('NULLIF(rolling_min_expected_stock,0)'))
    .withColumn('min_expected_stock', f.expr('LAST(min_expected_stock, True) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('min_expected_stock', f.expr('COALESCE(min_expected_stock, 0)'))
    # ------------------------------------------------------------------------------------
  
)

In [35]:
inventory_with_metrics = (
  inventory_with_metrics
    
    # AVERAGE DAILY SALES
    # ------------------------------------------------------------------------------------
    # getting daily sales velocity
    .withColumn('daily_sales_units', f.expr('AVG(total_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW)'))
    .withColumn('daily_sales_units', f.expr('LAST(daily_sales_units, True) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('daily_sales_units', f.expr('COALESCE(daily_sales_units, 0)'))
    # ------------------------------------------------------------------------------------
  
)

In [36]:
inventory_with_metrics.show()

+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+---------------+--------------------+--------------------------+------------------+------------------+
|store_id|sku|      date|product_category|on_hand_inventory_units|total_sales_units|replenishment_units|replenishment_flag|units_on_order|units_in_transit|units_in_dc|phantom_inventory|estimated_on_hand_inventory|prior_inventory|rolling_stock_onhand|rolling_min_expected_stock|min_expected_stock| daily_sales_units|
+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+---------------+--------------------+--------------------------+------------------+------------------+
|      63| 57|2019-01-01|     Category 04|          

In [38]:
# inventory_with_metrics.toPandas().to_csv("Dataset//inventory_with_metrics_data.csv", index=False)

## Calculate Shortest Lead Time for Each Store-SKU

In [40]:
df_vendor= df_vendor.withColumnRenamed('item_id','sku')

In [42]:
df_vendor.head(3)

[Row(key=1, vendor_id=1, sub_vendor_id=1001, store_id=1763, sku=1, LEAD_TIME_IN_DC=4, LEAD_TIME_IN_TRANSIT=3, LEAD_TIME_ON_ORDER=7),
 Row(key=2, vendor_id=1, sub_vendor_id=1001, store_id=1763, sku=2, LEAD_TIME_IN_DC=2, LEAD_TIME_IN_TRANSIT=1, LEAD_TIME_ON_ORDER=7),
 Row(key=3, vendor_id=1, sub_vendor_id=1002, store_id=1843, sku=2, LEAD_TIME_IN_DC=2, LEAD_TIME_IN_TRANSIT=1, LEAD_TIME_ON_ORDER=7)]

In [43]:

# calculate shortest lead time for each store-sku combination

lead_time = (
  df_vendor
    .withColumn('min_lead_time', f.expr('LEAST(lead_time_in_dc, lead_time_in_transit, lead_time_on_order)'))
    .select('store_id', 'sku', 'min_lead_time', 'lead_time_in_dc', 'lead_time_in_transit', 'lead_time_on_order')
    )
 


In [44]:
lead_time.show()

+--------+---+-------------+---------------+--------------------+------------------+
|store_id|sku|min_lead_time|lead_time_in_dc|lead_time_in_transit|lead_time_on_order|
+--------+---+-------------+---------------+--------------------+------------------+
|    1763|  1|            3|              4|                   3|                 7|
|    1763|  2|            1|              2|                   1|                 7|
|    1843|  2|            1|              2|                   1|                 7|
|    1763|  3|            3|              4|                   3|                 7|
|     486|  6|            1|              2|                   1|                 8|
|    1587|  7|            2|              3|                   2|                 9|
|    1556|  8|            3|              4|                   3|                 7|
|    1283| 39|            2|              3|                   2|                 6|
|    1763| 46|            1|              2|                   1|

__We can now calculate the safety stock requirements using the lower of the two values calculated from the data:__

## Determine Safety Stock

In [45]:

inventory_safety_stock = (
  inventory_with_metrics
    .join(lead_time, on=['store_id','sku'], how='leftouter')
  
    # safety stock for sales velocity is avg daily sales units * min_lead_time
    .withColumn('ss_sales_velocity', f.expr('daily_sales_units * min_lead_time'))
  
    # use the lower of the min_expected_stock at replenishment or sales_velocity-derived stock requirement as safety stock
    .withColumn('safety_stock', f.expr('CASE WHEN min_expected_stock < ss_sales_velocity THEN min_expected_stock ELSE ss_sales_velocity END'))
    .withColumn('safety_stock', f.expr('CASE WHEN replenishment_flag != 1 THEN 0 ELSE safety_stock END'))
    .withColumn('safety_stock', f.expr('COALESCE(safety_stock,0)'))
    .withColumn('safety_stock', f.expr('CASE WHEN safety_stock=0 THEN min_expected_stock ELSE safety_stock END'))
  
    .select(
      'date',
      'store_id',
      'sku',
      'product_category',
      'total_sales_units', 
      'on_hand_inventory_units',
      'replenishment_units', 
      'replenishment_flag',
      'phantom_inventory',
      'estimated_on_hand_inventory',  
      'prior_inventory',
      'rolling_min_expected_stock', 
      'min_expected_stock',
      'daily_sales_units', 
      'safety_stock', 
      'units_on_order',
      'units_in_transit',
      'units_in_dc',
      'lead_time_in_transit',
      'lead_time_in_dc',
      'lead_time_on_order'
      )
  )
 


In [46]:
inventory_safety_stock.show()

+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+-----------------+---------------------------+---------------+--------------------------+------------------+------------------+------------+--------------+----------------+-----------+--------------------+---------------+------------------+
|      date|store_id|sku|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|replenishment_flag|phantom_inventory|estimated_on_hand_inventory|prior_inventory|rolling_min_expected_stock|min_expected_stock| daily_sales_units|safety_stock|units_on_order|units_in_transit|units_in_dc|lead_time_in_transit|lead_time_in_dc|lead_time_on_order|
+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+-----------------+---------------------------+---------------+--------------------------+------------------+------------------+------------+---------

With a __safety stock level defined for each store-SKU__, we can now identify dates where:

The on-hand inventory is less than the safety stock level (on_hand_less_than_safety_stock)

The requested replenishment units are not sufficient to meet safety stock requirements (insufficient_inventory_pipeline_units)

The inventory pipeline is one day away from not being able to fulfill stocking requirements (insufficient_lead_time)

Each of these conditions represents an inventory management problem which requires addressing. The first two of these conditions may be calculated as follows:

### Identify Insufficient Inventory On-Hand & In-Pipeline Events

In [47]:

inventory_safety_stock_alert = (
  inventory_safety_stock
  
  # alert 1 - estimated on-hand inventory is less than safety stock
  .withColumn('on_hand_less_than_safety_stock', f.expr('CASE WHEN estimated_on_hand_inventory <= safety_stock THEN 1 ELSE 0 END'))
  
  # alert 2 - inventory in pipeline is not sufficient to reach the safety stock levels
  .withColumn('insufficient_inventory_pipeline_units', f.expr('''
    CASE
      WHEN  (on_hand_less_than_safety_stock = 1) AND 
            (units_on_order + units_in_transit + units_in_dc != 0) AND
            ((units_on_order + units_in_transit + units_in_dc) < (safety_stock - estimated_on_hand_inventory))
         THEN 1
      ELSE 0
      END'''))
  )
 


In [48]:
inventory_safety_stock_alert.show()

+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+-----------------+---------------------------+---------------+--------------------------+------------------+------------------+------------+--------------+----------------+-----------+--------------------+---------------+------------------+------------------------------+-------------------------------------+
|      date|store_id|sku|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|replenishment_flag|phantom_inventory|estimated_on_hand_inventory|prior_inventory|rolling_min_expected_stock|min_expected_stock| daily_sales_units|safety_stock|units_on_order|units_in_transit|units_in_dc|lead_time_in_transit|lead_time_in_dc|lead_time_on_order|on_hand_less_than_safety_stock|insufficient_inventory_pipeline_units|
+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+-----------

## Identify Insufficient Lead Time Events

In [49]:
# calculate lead times associated with inventory records
inventory_safety_stock_with_lead_times = (
  
  inventory_safety_stock_alert
  
    # lead time values at store-sku level for various stages 
    .withColumn('lead_time_in_transit', f.expr('COALESCE(lead_time_in_transit,0)'))
    .withColumn('lead_time_on_order', f.expr('COALESCE(lead_time_on_order,0)'))
    .withColumn('lead_time_in_dc', f.expr('COALESCE(lead_time_in_dc,0)'))
 
    # considering lead time only if estimated on-hand inventory and inventory in pipeline meet the safety stock levels
    .withColumn('lead_time', f.expr('''
       CASE
         WHEN on_hand_less_than_safety_stock = 1 AND
              (units_on_order + units_in_transit + units_in_dc != 0) AND
              ((units_on_order + units_in_transit + units_in_dc) >= (safety_stock - estimated_on_hand_inventory)) 
           THEN GREATEST(
                 COALESCE(lead_time_in_transit,0),
                 COALESCE(lead_time_on_order,0),
                 COALESCE(lead_time_in_dc,0)
                 )+1
         ELSE null 
         END'''))
  )

In [51]:
 
# identify lead time problems
lead_time_alerts = (
  
  inventory_safety_stock_with_lead_times.alias('a')
  
    # self join to get the previous lead time (most recent one) for the inventory pipeline
    .join(
      (inventory_safety_stock_with_lead_times
          .filter(
            f.expr('lead_time Is Not Null') # considering only non-null records
             ).alias('b')
        ), 
      on=f.expr('a.store_id=b.store_id AND a.sku=b.sku AND a.date > b.date'), 
      how='leftouter'
      )
    .groupBy('a.store_id','a.sku','a.date')
      .agg(
          f.max('a.lead_time').alias('lead_time'),
          f.max('b.date').alias('lead_date'), # day on which the lead time was assigned to the inventory pipeline
          f.max('b.lead_time').alias('prev_lead_time') # lead time assigned to the inventory pipeline
        ))

In [52]:

lead_time_alerts = (
  
  lead_time_alerts
  
    # flag is raised if difference in current date and lead date (from above) is greater than the lead time assigned (prev_lead_time)
    .withColumn('date_diff', f.expr('DATEDIFF(date, lead_date)'))
    .withColumn('insufficient_lead_time', f.expr('''
      CASE
        WHEN lead_time IS NULL AND (prev_lead_time - date_diff) <= 0 THEN 1
        ELSE 0
        END
      '''))
    .select(
      'date',
      'store_id',
      'sku',
      'insufficient_lead_time'
      )
  .join(
    inventory_safety_stock_alert,
    on=['store_id','sku','date']
    )
  )
 
 


In [53]:
lead_time_alerts.show()

+--------+---+----------+----------------------+----------------+-----------------+-----------------------+-------------------+------------------+-----------------+---------------------------+---------------+--------------------------+------------------+-------------------+------------------+--------------+----------------+-----------+--------------------+---------------+------------------+------------------------------+-------------------------------------+
|store_id|sku|      date|insufficient_lead_time|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|replenishment_flag|phantom_inventory|estimated_on_hand_inventory|prior_inventory|rolling_min_expected_stock|min_expected_stock|  daily_sales_units|      safety_stock|units_on_order|units_in_transit|units_in_dc|lead_time_in_transit|lead_time_in_dc|lead_time_on_order|on_hand_less_than_safety_stock|insufficient_inventory_pipeline_units|
+--------+---+----------+----------------------+----------------+---------

Combining these conditions, we might flag out of stock situations that require attention as follows:

## Consolidated Events

In [54]:

consolidated_oos_alerts = (
  lead_time_alerts
    .withColumn('alert_indicator', f.expr('''
      CASE
        WHEN on_hand_less_than_safety_stock = 1 AND insufficient_inventory_pipeline_units = 1 THEN 1 
        WHEN on_hand_less_than_safety_stock = 1 AND insufficient_inventory_pipeline_units != 1 AND insufficient_lead_time = 1 THEN 1
        ELSE 0
        END'''))
    .select(
      'date',
      'store_id',
      'sku',
      'product_category',
      'total_sales_units',
      'daily_sales_units',
      'alert_indicator'
      )
  )
 


In [55]:
consolidated_oos_alerts.show()

+----------+--------+---+----------------+-----------------+-------------------+---------------+
|      date|store_id|sku|product_category|total_sales_units|  daily_sales_units|alert_indicator|
+----------+--------+---+----------------+-----------------+-------------------+---------------+
|2019-05-11|      63| 57|     Category 04|                0| 0.8681318681318682|              1|
|2019-09-12|      63| 57|     Category 04|                0| 0.7032967032967034|              0|
|2020-02-24|      63| 57|     Category 04|                3| 0.9560439560439561|              0|
|2020-10-25|      63| 57|     Category 04|                2| 0.4945054945054945|              0|
|2019-04-20|      98| 64|     Category 01|                8| 1.4945054945054945|              0|
|2020-07-16|      98| 64|     Category 01|                0|                0.0|              1|
|2020-09-23|      98| 64|     Category 01|                0|                0.0|              1|
|2020-12-05|      98| 64|     

# Step 4: Identify Zero Sales Issues

### Calculate Ratio of Zero Sales Days to Total Sales Days by Store-SKU

In [57]:

# calculate ratio of total days of zero sales and total days on shelf across observed period
zero_sales_totals = ( 
  df_rw
    .withColumn('total_zero_sales_days', f.expr('CASE WHEN total_sales_units == 0 THEN 1 ELSE 0 END'))
    .withColumn('total_days', f.expr('1'))
    .groupBy(['store_id', 'sku'])
      .agg(
        f.sum('total_days').alias('total_days'),
        f.sum('total_zero_sales_days').alias('total_zero_sales_days')
        )
    .withColumn('zero_sales_day_probability', f.expr('total_zero_sales_days / total_days'))
    )
 


In [58]:
zero_sales_totals.show()

+--------+---+----------+---------------------+--------------------------+
|store_id|sku|total_days|total_zero_sales_days|zero_sales_day_probability|
+--------+---+----------+---------------------+--------------------------+
|    1540|155|       854|                  654|         0.765807962529274|
|    1283| 39|       854|                  794|        0.9297423887587822|
|     339| 64|       854|                  550|        0.6440281030444965|
|     397| 64|       854|                  584|        0.6838407494145199|
|    1634|110|       854|                  572|        0.6697892271662763|
|     334| 52|       854|                  794|        0.9297423887587822|
|     171| 64|       854|                  656|        0.7681498829039812|
|     373| 64|       854|                  569|        0.6662763466042154|
|    1616| 64|       854|                  794|        0.9297423887587822|
|     679| 64|       854|                  655|        0.7669789227166276|
|     999| 64|       854|

### Calculate Consecutive Zero Sales Days

In [59]:

zero_sales_days = (
  
  df_rw 
  
    # flag the occurrence of first zero sales day in a series
    .withColumn('sales_change_flag', f.expr('''
        CASE 
          WHEN total_sales_units=0 AND LAG(total_sales_units,1) OVER(PARTITION BY store_id, sku ORDER BY date) != 0 THEN 1 
          ELSE 0 
          END''')) 
  
    # count number of zero sales day series to date (associates records with a given series)
    .withColumn('zero_sales_flag_rank', f.expr('SUM(sales_change_flag) OVER(PARTITION BY store_id, sku ORDER BY date)')) 
  
    # flag all zero sales days
    .withColumn('sales_change_flag_inv', f.expr('CASE WHEN total_sales_units = 0 THEN 1 ELSE 0 END')) 
  
    # count consecutive zero sales days (counter resets with a non-zero sales instance)
    .withColumn('total_days_wo_sales', f.expr('SUM(sales_change_flag_inv) OVER(PARTITION BY store_id, sku, zero_sales_flag_rank ORDER BY date)'))
    .withColumn('total_days_wo_sales', f.expr('CASE WHEN total_sales_units != 0 THEN 0 ELSE total_days_wo_sales END'))
    
    .select(
      'date',
      'store_id',
      'sku',
      'total_sales_units',
      'zero_sales_flag_rank',
      'sales_change_flag_inv',
      'total_days_wo_sales'
      )
  )
 


In [60]:
zero_sales_days.orderBy('store_id','sku','date').show()

+----------+--------+---+-----------------+--------------------+---------------------+-------------------+
|      date|store_id|sku|total_sales_units|zero_sales_flag_rank|sales_change_flag_inv|total_days_wo_sales|
+----------+--------+---+-----------------+--------------------+---------------------+-------------------+
|2019-01-01|      63| 57|                0|                   0|                    1|                  1|
|2019-01-02|      63| 57|                0|                   0|                    1|                  2|
|2019-01-03|      63| 57|                0|                   0|                    1|                  3|
|2019-01-04|      63| 57|                0|                   0|                    1|                  4|
|2019-01-05|      63| 57|                3|                   0|                    0|                  0|
|2019-01-06|      63| 57|                2|                   0|                    0|                  0|
|2019-01-07|      63| 57|            

## Calculate Cumulative Probability of Zero Sales Event

In [61]:

zero_sales_inventory = (
  zero_sales_days
    .join(zero_sales_totals.alias('prob'), on=['store_id', 'sku'], how = 'leftouter')
    .withColumn('zero_sales_probability', f.expr('pow(zero_sales_day_probability, total_days_wo_sales)'))
    .withColumn('no_sales_flag', f.expr('CASE WHEN zero_sales_probability < 0.05 THEN 1 ELSE 0 END'))
  )
 


In [62]:
zero_sales_inventory.orderBy('store_id','sku','date').show()

+--------+---+----------+-----------------+--------------------+---------------------+-------------------+----------+---------------------+--------------------------+----------------------+-------------+
|store_id|sku|      date|total_sales_units|zero_sales_flag_rank|sales_change_flag_inv|total_days_wo_sales|total_days|total_zero_sales_days|zero_sales_day_probability|zero_sales_probability|no_sales_flag|
+--------+---+----------+-----------------+--------------------+---------------------+-------------------+----------+---------------------+--------------------------+----------------------+-------------+
|      63| 57|2019-01-01|                0|                   0|                    1|                  1|       854|                  654|         0.765807962529274|     0.765807962529274|            0|
|      63| 57|2019-01-02|                0|                   0|                    1|                  2|       854|                  654|         0.765807962529274|    0.586461835473

# Step 5: Identify Alert Conditions

## Combining all flags

In [63]:

all_alerts = (
  consolidated_oos_alerts.alias('oos') # OOS alert
    .join(phantom_inventory.alias('pi'), on=['store_id','sku','date'], how='leftouter') # phantom inventory indicator
    .join(zero_sales_inventory, on=['store_id','sku','date'], how='leftouter') # zero sales alert
    .selectExpr(
      'date',
      'store_id',
      'sku',
      'product_category',
      'oos.total_sales_units',
      'oos.alert_indicator as oos_alert',
      'oos.daily_sales_units',
      'no_sales_flag as zero_sales_flag',
      'phantom_inventory',
      'phantom_inventory_ind'
      )
  )
 


In [None]:
# all_alerts.show()

### SavingFlagged Inventory Data for Future Use

In [64]:
all_alerts.toPandas().to_csv("Dataset//inventory_flagged_data.csv", index=False)