# Lab08 Case Study for Spark Structured APIs

| Source from [Databricks Github on-shelf-availability](https://github.com/databricks-industry-solutions/on-shelf-availability)

The definition of [on-shelf-availability](https://help.inspector-cloud.com/en/docs/business/kpi/osa/)

---

## Step 1: Access Data

We will identify potential out-of-stock and on-shelf availability issues requiring further scrutiny through the analysis of store inventory records:

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/osa_tredence_alerts.jpg' width=75%>

Out-of-stock (OOS) scenarios occur when a retailer does not have enough inventory to meet consumer demand.  When an insufficient number of product units are made available to customers, not only are immediate sales lost but consumer confidence in the retailer is eroded. Out-of-stocks occur for a variety of reasons. Poor forecasting, limited supply, and operational challenges are all common causes. With each, swift action is required to identify and address the source of the problem less they continue to impact sales.  The challenge with out of stocks is that by the time it is identified, the lead time for requesting replacement units and making them available on the shelf for the consumer may require the retailer to live with the issue for quite some time. It is therefore important that any analysis of stocking levels consider the time to replenishment associated with a given item and location.

A bit different from OOS issues are on-shelf availability (OSA) problems where inventory may be in the store but it's not placed in a manner that makes it easily accessible to customers. Product may be in inventory but the principal display may give the impression the item is out of stock or in low quantity.  Items may be on the shelf but not pulled forward in a manner that makes them easily viewable by customers.  Product may be technically in inventory but in a backroom that's not accessible to customers. Regardless of the reason, OSA issues tend to lead to lost revenue for retailers.

To illustrate how analysis of OOS and OSA issues may be performed, Tredence has made available a simulated set of inventory and vendor data available for download [here](https://github.com/tredenceofficial/OSA-Data). To make these data available for use with this and the related notebooks, download the CSV files and then load them to your cloud storage environment.  

We have automated this downloading step for you and used a */tmp/osa* storage path instead through out this accelerator.

In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext 
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql.types import *
import pyspark.sql.functions as f
import pyspark
import pandas as pd

pd.options.mode.copy_on_write = True


spark = SparkSession.builder\
  .config("spark.executor.memory", "2g") \
  .config("spark.driver.memory", "2g") \
  .config("spark.log.level", "ERROR") \
  .appName("lab8_exercise").getOrCreate()



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/10 07:20:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting Spark log level to "ERROR".


In [2]:

# schema for inventory data
inventory_schema = StructType([
  StructField('date',DateType()),
  StructField('store_id',IntegerType()),
  StructField('sku',IntegerType()),
  StructField('product_category',StringType()),
  StructField('total_sales_units',IntegerType()),
  StructField('on_hand_inventory_units',IntegerType()),
  StructField('replenishment_units',IntegerType()),
  StructField('inventory_pipeline',IntegerType()),
  StructField('units_in_transit',IntegerType()),
  StructField('units_in_dc',IntegerType()),
  StructField('units_on_order',IntegerType()),
  StructField('units_under_promotion',IntegerType()),
  StructField('shelf_capacity',IntegerType())
  ])

# read inventory data 
df1 = spark.read.csv(
       'osa_raw_data.csv',
       header = True,
       schema = inventory_schema,
       dateFormat = 'yyyyMMdd'
       ).repartition(3) 
df1.createOrReplaceTempView("inventory_raw")
spark.table('inventory_raw').show(10)

                                                                                

+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+----------------+-----------+--------------+---------------------+--------------+
|      date|store_id|sku|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|inventory_pipeline|units_in_transit|units_in_dc|units_on_order|units_under_promotion|shelf_capacity|
+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+----------------+-----------+--------------+---------------------+--------------+
|2021-04-12|     381| 64|     Category 01|                0|                      7|                  0|                 7|               0|          0|             0|                    0|            18|
|2020-06-06|     531| 64|     Category 01|                0|                      0|                  0|                 0|               0|          0|             0|             

In [3]:
# schema for vendor data
vendor_schema = StructType([
  StructField('key',IntegerType()),
  StructField('vendor_id',IntegerType()),
  StructField('sub_vendor_id',IntegerType()),
  StructField('store_id',IntegerType()),
  StructField('item_id',IntegerType()),
  StructField('lead_time_in_dc',IntegerType()),
  StructField('lead_time_in_transit',IntegerType()),
  StructField('lead_time_on_order',IntegerType()),
])

# read vendor data and persist as delta table
df2 = spark.read.csv(
     'vendor_leadtime_info.csv',
     header = True,
     schema = vendor_schema
     ).withColumnRenamed('item_id','sku') # rename item_id to sku for consistency with inventory data
   
df2.createOrReplaceTempView("osa_vendor")

# review data
spark.table('osa_vendor').show(10)

+---+---------+-------------+--------+---+---------------+--------------------+------------------+
|key|vendor_id|sub_vendor_id|store_id|sku|lead_time_in_dc|lead_time_in_transit|lead_time_on_order|
+---+---------+-------------+--------+---+---------------+--------------------+------------------+
|  1|        1|         1001|    1763|  1|              4|                   3|                 7|
|  2|        1|         1001|    1763|  2|              2|                   1|                 7|
|  3|        1|         1002|    1843|  2|              2|                   1|                 7|
|  4|        1|         1001|    1763|  3|              4|                   3|                 7|
|  5|        2|         2016|     486|  6|              2|                   1|                 8|
|  6|        2|         2073|    1587|  7|              3|                   2|                 9|
|  7|        2|         2087|    1556|  8|              4|                   3|                 7|
|  8|     

## Step 2: Address Missing Records

The inventory data contains records for products in specific stores when an inventory-related transaction occurs. Since not every product *moves* on every date, there will be days for which there is no data for certain store and product SKU combinations. 

Time series analysis techniques used in our framework require a complete set of records for products within a given location. To address the *missing* entries, we will generate a list of all dates for which we expect records. A cross-join with store-SKU combinations will provide the base set of records for which we expect data.  

In the real world, not all products are intended to be sold at each location at all times.  In an analysis of non-simulated data, we may require additional information to determine the complete set of dates for a given store-SKU combination for which we should have data:

In [4]:
# calculate start and end of inventory dataset
start_date, end_date = (
  spark.table('inventory_raw').groupBy()
    .agg(
      f.min('date').alias('start_date'),
      f.max('date').alias('end_date')  
        )
  .collect()[0]
  )

# generate contiguous set of dates within start and end range
dates = (
  spark
    .range( (end_date - start_date).days + 1 )  # days in range
    .withColumn('id', f.expr('cast(id as integer)')) # range value from long (bigint) to integer
    .withColumn('date', f.lit(start_date) + f.col('id'))  # add range value to start date to generate contiguous date range
    .select('date')
  )

# display dates
dates.orderBy('date').show(5)

+----------+
|      date|
+----------+
|2019-01-01|
|2019-01-02|
|2019-01-03|
|2019-01-04|
|2019-01-05|
+----------+
only showing top 5 rows



                                                                                

In [5]:
# extract unique store-sku combinations in inventory records
store_skus = (
  spark
    .table('inventory_raw')
    .select('store_id','sku','product_category')
    .groupBy('store_id','sku')
      .agg(f.last('product_category').alias('product_category')) # just a hack to get last category assigned to each store-sku combination
  )

store_skus.show(5)

+--------+---+----------------+
|store_id|sku|product_category|
+--------+---+----------------+
|      63| 57|     Category 04|
|      98| 64|     Category 01|
|     164| 76|     Category 05|
|     171| 64|     Category 01|
|     178| 64|     Category 01|
+--------+---+----------------+
only showing top 5 rows



We can now cross-join the contiguous dates with each unique store-SKU found in the inventory dataset to create the expected records in our complete dataset.  Left outer joining these data to our actual inventory data, we will now have a complete set of records though there will be missing values in many fields which we will need to address in our next step:

In [6]:
# generate one record for each store-sku for each date in range
inventory_with_gaps = (
  dates
    .crossJoin(store_skus)
    .join(
      spark.table('inventory_raw').drop('product_category'), 
      on=['date','store_id','sku'], 
      how='leftouter'
      )
  )

# display inventory records
inventory_with_gaps.show(5)

+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+----------------+-----------+--------------+---------------------+--------------+
|      date|store_id|sku|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|inventory_pipeline|units_in_transit|units_in_dc|units_on_order|units_under_promotion|shelf_capacity|
+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+----------------+-----------+--------------+---------------------+--------------+
|2019-01-01|      63| 57|     Category 04|             NULL|                   NULL|               NULL|              NULL|            NULL|       NULL|          NULL|                 NULL|          NULL|
|2019-01-02|      63| 57|     Category 04|                0|                      8|                  0|                16|               0|          0|             8|             

We now have one record for each date-store-SKU combination in our dataset.  However, on those dates for which there were no inventory changes, we are currently missing information about the inventory status of those stores and SKUs.  To address this, we will employ a combination of forward filling, *i.e.* applying the last valid record to subsequent records until a new value is encountered, and defaults.  For the forward fill, we will make use of the [last()](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.last.html) function, providing a value of *True* for the *ignorenulls* argument which will force it to retrieve the last non-null value in a sequence:

In [7]:
# copy dataframe to enable manipulations in loop
inventory_cleansed = inventory_with_gaps

# apply forward fill to appropriate columns
for c in ['shelf_capacity', 'on_hand_inventory_units']:
  inventory_cleansed = (
    inventory_cleansed
      .withColumn(
          c, 
          f.expr('LAST({0}, True) OVER(PARTITION BY store_id, sku ORDER BY date)'.format(c)) # get last non-null prior value (aka forward-fill)
           )
        )
  
# apply default value of 0 to appropriate columns
inventory_cleansed = (
  inventory_cleansed
    .fillna(
      0, 
      [ 'total_sales_units',
        'units_under_promotion',
        'units_in_transit',
        'units_in_dc',
        'units_on_order',
        'replenishment_units',
        'inventory_pipeline'
        ]
      )
  )

# display data with imputed values
inventory_cleansed.show(5)

+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+----------------+-----------+--------------+---------------------+--------------+
|      date|store_id|sku|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|inventory_pipeline|units_in_transit|units_in_dc|units_on_order|units_under_promotion|shelf_capacity|
+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+----------------+-----------+--------------+---------------------+--------------+
|2019-01-01|      63| 57|     Category 04|                0|                   NULL|                  0|                 0|               0|          0|             0|                    0|          NULL|
|2019-01-02|      63| 57|     Category 04|                0|                      8|                  0|                16|               0|          0|             8|             

## Step 3: Identify Key Inventory Events

With our complete inventory dataset in-hand, we can now identify key inventory-related events within the data.  These include the occurrence of promotions intended to drive product sales and replenishment events during which new units are added to inventory:

In [8]:
# derive inventory flags
inventory_final = (
  inventory_cleansed
    .withColumn('promotion_flag', f.expr('CASE WHEN units_under_promotion > 0 THEN 1 ELSE 0 END'))
    .withColumn('replenishment_flag', f.expr('CASE WHEN replenishment_units > 0 THEN 1 ELSE 0 END'))
    )

inventory_final.show(5)

+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+----------------+-----------+--------------+---------------------+--------------+--------------+------------------+
|      date|store_id|sku|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|inventory_pipeline|units_in_transit|units_in_dc|units_on_order|units_under_promotion|shelf_capacity|promotion_flag|replenishment_flag|
+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+----------------+-----------+--------------+---------------------+--------------+--------------+------------------+
|2019-01-01|      63| 57|     Category 04|                0|                   NULL|                  0|                 0|               0|          0|             0|                    0|          NULL|             0|                 0|
|2019-01-02|      63| 57|     Category 04|  

We can now persist this data for later use:

In [9]:
# spark.catalog.dropTempView('osa_inventory')

# (
#   inventory_final
#     .repartition(3)
#     .write
#       .format('parquet')
#       .mode('overwrite')
#       .option('overwriteSchema', 'true')
#       .saveAsTable('osa_inventory')
#    )
inventory_final.createOrReplaceTempView("osa_inventory")

## Step 4: Access Data

Our first step is to access the inventory and vendor data assembled and cleansed in the last notebook:

In [10]:
inventory = spark.table('osa_inventory')
vendor = spark.table('osa_vendor')

## Step 5: Identify Phantom Inventory

Next, we will identify problems related to phantom inventory.  Phantom inventory represents product units not accounted for between the inventory and the sales systems.  These units may represent items misplaced, stolen, lost or otherwise inaccurately tracked in one system or the other.  The identification of these units is essential for ensuring the right quantity, *i.e.* not too few and not too many, of a given product are purchased for replenishment and may point to needed operational improvements to ensure accurate inventories are maintained moving forward.

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/osa_tredence_phantominventory.png' width=75%>

Phantom inventory is identified by simply calculating the number of units expected to be on-hand at the end of the day relative to those actually on-hand. Minor differences between the units expected and those actually in inventory may not require immediate attention.  A phantom inventory indicator flag is set when the phantom inventory is some multiple of the average daily sales for a given product. Here, we set this multiple to 5-times but some organizations may wish to be more or less sensitive to the detection of problematic levels of phantom inventory:

In [11]:
# phantom inventory calculations
phantom_inventory = (
  inventory
    
    # average daily sales
    .withColumn('daily_sales_units', f.expr('AVG(total_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date)')) 
    
    # on-hand inventory units at the end of previous day
    # for dates with no prior day inventory units, provide alt calculation
    .withColumn('start_on_hand_units', f.expr('''
      COALESCE( 
        LAG(on_hand_inventory_units, 1) OVER(PARTITION BY store_id, sku ORDER BY date), 
        on_hand_inventory_units + total_sales_units - replenishment_units
        )
        ''')) 
    
    # on-hand inventory units at end of day
    .withColumn('end_on_hand_units', f.expr('COALESCE(on_hand_inventory_units, 0)')) 
    
    # calculate phantom inventory as difference in:
    # (previous day's on-hand inventory + current day's replenished units - current day's sales units) and current day's end-of-day inventory 
    .withColumn('phantom_inventory', f.expr('start_on_hand_units + replenishment_units - total_sales_units - end_on_hand_units')) 
    
    # flag only when phantom inventory is at least 5 times average daily sales
    .withColumn('phantom_inventory_ind', f.expr('''
      CASE
        WHEN phantom_inventory <> 0 AND ABS(phantom_inventory) > 5 * daily_sales_units THEN 1 
        ELSE 0 
        END'''))  
  
    .select(
      'date',
      'store_id',
      'sku',
      'daily_sales_units',
      'start_on_hand_units',
      'replenishment_units',
      'total_sales_units',
      'end_on_hand_units',
      'phantom_inventory',
      'phantom_inventory_ind'
      )
    )

# display results
phantom_inventory.show(5)

+----------+--------+---+-----------------+-------------------+-------------------+-----------------+-----------------+-----------------+---------------------+
|      date|store_id|sku|daily_sales_units|start_on_hand_units|replenishment_units|total_sales_units|end_on_hand_units|phantom_inventory|phantom_inventory_ind|
+----------+--------+---+-----------------+-------------------+-------------------+-----------------+-----------------+-----------------+---------------------+
|2019-01-01|      63| 57|              0.0|               NULL|                  0|                0|                0|             NULL|                    0|
|2019-01-02|      63| 57|              0.0|                  8|                  0|                0|                8|                0|                    0|
|2019-01-03|      63| 57|              0.0|                  8|                  0|                0|                8|                0|                    0|
|2019-01-04|      63| 57|              0

## Step 6: Identify Out of Stocks

Out of stocks occur when inventory is not sufficient to meet demand.  Most retailers define a safety stock level that serves as the threshold for triggering replenishment orders.  When inventory dips below the safety stock level, a replenishment order is generated.  The remaining inventory on-hand must then be sufficient to meet demand until the replacement units arrive.

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/osa_tredence_safetystock.png' width=75%>

There are numerous ways to calculate a safety stock level for a given product.  Here, we will consider the store-specific dynamics associated with a product to arrive at two potentially valid safety stock levels.  The first of these is the number of units on-hand before past replenishment events. The second of these will consider average daily sales relative to lead times for a product. For a given store-SKU combination, we will take the lower of these two values to be our safety stock level.  But before we can calculate these values, we need to estimate the inventory on-hand, something we tackled in our prior phantom inventory calculations:

In [12]:
# combine inventory with phantom inventory and min lead times
inventory_with_pi = (
  inventory.alias('inv')
    .join(phantom_inventory.alias('pi'), on=['store_id','sku','date'])
  
    # limit fields to use moving forward
    .selectExpr(
      'inv.store_id',
      'inv.sku',
      'inv.date',
      'inv.product_category',
      'inv.on_hand_inventory_units',
      'inv.total_sales_units',
      'inv.replenishment_units',
      'inv.replenishment_flag',
      'inv.units_on_order',
      'inv.units_in_transit',
      'inv.units_in_dc',
      'pi.phantom_inventory'
      )
  
  # correct inventory values to enable calculations
  .withColumn('phantom_inventory', f.expr('COALESCE(phantom_inventory, 0)')) 
  .withColumn('on_hand_inventory_units', f.expr('''
              CASE 
                WHEN on_hand_inventory_units < 0 THEN 0 
                ELSE on_hand_inventory_units 
                END''')
             )
   .withColumn('replenishment_units', f.expr('''
              CASE 
                WHEN replenishment_flag = 1 THEN replenishment_units
                ELSE 0 
                END''')
             )
  
  # initialize estimated on-hand inventory field
   .withColumn('estimated_on_hand_inventory', f.lit(0)) 
  )

inventory_with_pi.show(5)

+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+
|store_id|sku|      date|product_category|on_hand_inventory_units|total_sales_units|replenishment_units|replenishment_flag|units_on_order|units_in_transit|units_in_dc|phantom_inventory|estimated_on_hand_inventory|
+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+
|      63| 57|2019-05-11|     Category 04|                      1|                0|                  0|                 0|             0|               0|          0|                0|                          0|
|      63| 57|2019-09-12|     Category 04|                     23|                0|                  0|                 0|             0|      

In [13]:
import warnings

# iterate over inventory to calculate current inventory levels
def get_estimated_inventory(inventory_pd: pd.DataFrame) -> pd.DataFrame:
  
  inventory_pd.sort_values('date', inplace=True)

  # iterate over records in inventory data
  for i in range(1,len(inventory_pd)):
    
    # get component values
    previous_inv = inventory_pd.estimated_on_hand_inventory.loc[i-1].copy()
    if previous_inv < 0: 
        previous_inv = 0
    
    replenishment_units = inventory_pd.replenishment_units.loc[i].copy()
    total_sales_units = inventory_pd.total_sales_units.loc[i].copy()
    phantom_inventory_units = inventory_pd.phantom_inventory.loc[i].copy()
    on_hand_inventory_units = inventory_pd.on_hand_inventory_units.loc[i].copy()
    
    # calculate estimated on-hand inventory
    estimated_on_hand_inventory = (previous_inv + replenishment_units - total_sales_units - phantom_inventory_units)
    if estimated_on_hand_inventory < 0: estimated_on_hand_inventory = 0
    if estimated_on_hand_inventory > on_hand_inventory_units: estimated_on_hand_inventory = on_hand_inventory_units
    
    with warnings.catch_warnings(record=True):
        inventory_pd.estimated_on_hand_inventory.loc[i] = estimated_on_hand_inventory

  return inventory_pd


# calculate estimated on-hand inventory
inventory_on_hand = (
  inventory_with_pi
  .groupby('store_id', 'sku')
    .applyInPandas( get_estimated_inventory, schema=inventory_with_pi.schema )
  )

inventory_on_hand.show(5)

ImportError: PyArrow >= 4.0.0 must be installed; however, it was not found.

Using this data, we can now calculate the average amount on-hand prior to a replenishment event as well as the average number of units sold for each store-SKU.  For both metrics, we'll limit the calculation to the 90 days prior to the current inventory record.  This will allow for changes in stocking practices and sales velocity over the dataset:

In [None]:
inventory_with_metrics = (
  inventory_on_hand
    
    # AVERAGE ON-HAND UNITS PRIOR TO REPLENISHMENT
    # ------------------------------------------------------------------------------------
    # getting prior day's on-hand inventory only for the days with replenishment
    .withColumn('prior_inventory', f.expr('LAG(estimated_on_hand_inventory, 1) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('prior_inventory', f.expr('COALESCE(prior_inventory,0)'))
    .withColumn('prior_inventory', f.expr('CASE WHEN replenishment_flag=1 THEN prior_inventory ELSE 0 END'))
  
    # calculating rolling average of prior day's on-hand inventory for days with replenishment (over last 90 days)
    .withColumn('rolling_stock_onhand', f.expr('''
      SUM(prior_inventory) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW) /
      (SUM(replenishment_flag) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW) + 1)
      '''
      ))
    .withColumn('rolling_min_expected_stock', f.expr('CASE WHEN replenishment_flag != 1 THEN 0 ELSE rolling_stock_onhand END'))
    .withColumn('rolling_min_expected_stock', f.expr('COALESCE(rolling_min_expected_stock,0)'))
    
    # fixing the inventory values for all dates through forward fill
    .withColumn('min_expected_stock', f.expr('NULLIF(rolling_min_expected_stock,0)'))
    .withColumn('min_expected_stock', f.expr('LAST(min_expected_stock, True) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('min_expected_stock', f.expr('COALESCE(min_expected_stock, 0)'))
    # ------------------------------------------------------------------------------------
  
    # AVERAGE DAILY SALES
    # ------------------------------------------------------------------------------------
    # getting daily sales velocity
    .withColumn('daily_sales_units', f.expr('AVG(total_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 90 PRECEDING AND CURRENT ROW)'))
    .withColumn('daily_sales_units', f.expr('LAST(daily_sales_units, True) OVER(PARTITION BY store_id, sku ORDER BY date)'))
    .withColumn('daily_sales_units', f.expr('COALESCE(daily_sales_units, 0)'))
    # ------------------------------------------------------------------------------------
  )

inventory_with_metrics.show(10)

[Stage 147:>                                                        (0 + 1) / 1]

+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+---------------+--------------------+--------------------------+------------------+------------------+
|store_id|sku|      date|product_category|on_hand_inventory_units|total_sales_units|replenishment_units|replenishment_flag|units_on_order|units_in_transit|units_in_dc|phantom_inventory|estimated_on_hand_inventory|prior_inventory|rolling_stock_onhand|rolling_min_expected_stock|min_expected_stock| daily_sales_units|
+--------+---+----------+----------------+-----------------------+-----------------+-------------------+------------------+--------------+----------------+-----------+-----------------+---------------------------+---------------+--------------------+--------------------------+------------------+------------------+
|      63| 57|2019-01-01|     Category 04|          

                                                                                

To derive safety stock levels using average daily sales, we need to know something about the lead times for the replenishment of particular SKUs within a given store location.  We can derive this as follows:

In [None]:
# calculate shortest lead time for each store-sku combination
lead_time = (
  vendor
    .withColumn('min_lead_time', f.expr('LEAST(lead_time_in_dc, lead_time_in_transit, lead_time_on_order)'))
    .select('store_id', 'sku', 'min_lead_time', 'lead_time_in_dc', 'lead_time_in_transit', 'lead_time_on_order')
    )

lead_time.show(5)

+--------+---+-------------+---------------+--------------------+------------------+
|store_id|sku|min_lead_time|lead_time_in_dc|lead_time_in_transit|lead_time_on_order|
+--------+---+-------------+---------------+--------------------+------------------+
|    1763|  1|            3|              4|                   3|                 7|
|    1763|  2|            1|              2|                   1|                 7|
|    1843|  2|            1|              2|                   1|                 7|
|    1763|  3|            3|              4|                   3|                 7|
|     486|  6|            1|              2|                   1|                 8|
+--------+---+-------------+---------------+--------------------+------------------+
only showing top 5 rows



We can now calculate the safety stock requirements using the lower of the two values calculated from the data:

In [None]:
inventory_safety_stock = (
  inventory_with_metrics
    .join(lead_time, on=['store_id','sku'], how='leftouter')
  
    # safety stock for sales velocity is avg daily sales units * min_lead_time
    .withColumn('ss_sales_velocity', f.expr('daily_sales_units * min_lead_time'))
  
    # use the lower of the min_expected_stock at replinishment or sales_velocity-derived stock requirement as safety stock
    .withColumn('safety_stock', f.expr('CASE WHEN min_expected_stock < ss_sales_velocity THEN min_expected_stock ELSE ss_sales_velocity END'))
    .withColumn('safety_stock', f.expr('CASE WHEN replenishment_flag != 1 THEN 0 ELSE safety_stock END'))
    .withColumn('safety_stock', f.expr('COALESCE(safety_stock,0)'))
    .withColumn('safety_stock', f.expr('CASE WHEN safety_stock=0 THEN min_expected_stock ELSE safety_stock END'))
  
    .select(
      'date',
      'store_id',
      'sku',
      'product_category',
      'total_sales_units', 
      'on_hand_inventory_units',
      'replenishment_units', 
      'replenishment_flag',
      'phantom_inventory',
      'estimated_on_hand_inventory',  
      'prior_inventory',
      'rolling_min_expected_stock', 
      'min_expected_stock',
      'daily_sales_units', 
      'safety_stock', 
      'units_on_order',
      'units_in_transit',
      'units_in_dc',
      'lead_time_in_transit',
      'lead_time_in_dc',
      'lead_time_on_order'
      )
  )

inventory_safety_stock.show(5)

[Stage 187:>                                                        (0 + 1) / 1]

+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+-----------------+---------------------------+---------------+--------------------------+------------------+-----------------+------------------+--------------+----------------+-----------+--------------------+---------------+------------------+
|      date|store_id|sku|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|replenishment_flag|phantom_inventory|estimated_on_hand_inventory|prior_inventory|rolling_min_expected_stock|min_expected_stock|daily_sales_units|      safety_stock|units_on_order|units_in_transit|units_in_dc|lead_time_in_transit|lead_time_in_dc|lead_time_on_order|
+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+-----------------+---------------------------+---------------+--------------------------+------------------+-----------------+-------------

                                                                                

With a safety stock level defined for each store-SKU, we can now identify dates where:</p>

1. The on-hand inventory is less than the safety stock level (*on_hand_less_than_safety_stock*)
2. The requested replinishment units are not sufficient to meet safety stock requirements (*insufficient_inventory_pipeline_units*)
3. The inventory pipeline is one day away from not being able to fulfill stocking requirements (*insufficient_lead_time*)

Each of these conditions represents an inventory management problem which requires addressing. The first two of these conditions may be calculated as follows:

In [None]:
inventory_safety_stock_alert = (
  inventory_safety_stock
  
  # alert 1 - estimated on-hand inventory is less than safety stock
  .withColumn('on_hand_less_than_safety_stock', f.expr('CASE WHEN estimated_on_hand_inventory <= safety_stock THEN 1 ELSE 0 END'))
  
  # alert 2 - inventory in pipeline is not sufficient to reach the safety stock levels
  .withColumn('insufficient_inventory_pipeline_units', f.expr('''
    CASE
      WHEN  (on_hand_less_than_safety_stock = 1) AND 
            (units_on_order + units_in_transit + units_in_dc != 0) AND
            ((units_on_order + units_in_transit + units_in_dc) < (safety_stock - estimated_on_hand_inventory))
         THEN 1
      ELSE 0
      END'''))
  )

inventory_safety_stock_alert.show(5)

[Stage 226:>                                                        (0 + 1) / 1]

+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+-----------------+---------------------------+---------------+--------------------------+------------------+-----------------+------------------+--------------+----------------+-----------+--------------------+---------------+------------------+------------------------------+-------------------------------------+
|      date|store_id|sku|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|replenishment_flag|phantom_inventory|estimated_on_hand_inventory|prior_inventory|rolling_min_expected_stock|min_expected_stock|daily_sales_units|      safety_stock|units_on_order|units_in_transit|units_in_dc|lead_time_in_transit|lead_time_in_dc|lead_time_on_order|on_hand_less_than_safety_stock|insufficient_inventory_pipeline_units|
+----------+--------+---+----------------+-----------------+-----------------------+-------------------+------------------+-

                                                                                

We can identify dates where there was insufficient lead time in the pipeline to meet expected demand, the last of our three events, as follows:

In [None]:
# calculate lead times associated with inventory records
inventory_safety_stock_with_lead_times = (
  
  inventory_safety_stock_alert
  
    # lead time values at store-sku level for various stages 
    .withColumn('lead_time_in_transit', f.expr('COALESCE(lead_time_in_transit,0)'))
    .withColumn('lead_time_on_order', f.expr('COALESCE(lead_time_on_order,0)'))
    .withColumn('lead_time_in_dc', f.expr('COALESCE(lead_time_in_dc,0)'))

    # considering lead time only if estimated on-hand inventory and inventory in pipeline meet the safety stock levels
    .withColumn('lead_time', f.expr('''
       CASE
         WHEN on_hand_less_than_safety_stock = 1 AND
              (units_on_order + units_in_transit + units_in_dc != 0) AND
              ((units_on_order + units_in_transit + units_in_dc) >= (safety_stock - estimated_on_hand_inventory)) 
           THEN GREATEST(
                 COALESCE(lead_time_in_transit,0),
                 COALESCE(lead_time_on_order,0),
                 COALESCE(lead_time_in_dc,0)
                 )+1
         ELSE null 
         END'''))
  )

# identify lead time problems
lead_time_alerts = (
  
  inventory_safety_stock_with_lead_times.alias('a')
  
    # self join to get the previous lead time (most recent one) for the inventory pipeline
    .join(
      (inventory_safety_stock_with_lead_times
          .filter(
            f.expr('lead_time Is Not Null') # considering only non-null records
             ).alias('b')
        ), 
      on=f.expr('a.store_id=b.store_id AND a.sku=b.sku AND a.date > b.date'), 
      how='leftouter'
      )
    .groupBy('a.store_id','a.sku','a.date')
      .agg(
          f.max('a.lead_time').alias('lead_time'),
          f.max('b.date').alias('lead_date'), # day on which the lead time was assigned to the inventory pipeline
          f.max('b.lead_time').alias('prev_lead_time') # lead time assigned to the inventory pipeline
        )
  
    # flag is raised if difference in current date and lead date (from above) is greater than the lead time assigned (prev_lead_time)
    .withColumn('date_diff', f.expr('DATEDIFF(date, lead_date)'))
    .withColumn('insufficient_lead_time', f.expr('''
      CASE
        WHEN lead_time IS NULL AND (prev_lead_time - date_diff) <= 0 THEN 1
        ELSE 0
        END
      '''))
    .select(
      'date',
      'store_id',
      'sku',
      'insufficient_lead_time'
      )
  .join(
    inventory_safety_stock_alert,
    on=['store_id','sku','date']
    )
  )


lead_time_alerts.show(10)

[Stage 268:>                                                        (0 + 1) / 1]

+--------+---+----------+----------------------+----------------+-----------------+-----------------------+-------------------+------------------+-----------------+---------------------------+---------------+--------------------------+------------------+-------------------+------------------+--------------+----------------+-----------+--------------------+---------------+------------------+------------------------------+-------------------------------------+
|store_id|sku|      date|insufficient_lead_time|product_category|total_sales_units|on_hand_inventory_units|replenishment_units|replenishment_flag|phantom_inventory|estimated_on_hand_inventory|prior_inventory|rolling_min_expected_stock|min_expected_stock|  daily_sales_units|      safety_stock|units_on_order|units_in_transit|units_in_dc|lead_time_in_transit|lead_time_in_dc|lead_time_on_order|on_hand_less_than_safety_stock|insufficient_inventory_pipeline_units|
+--------+---+----------+----------------------+----------------+---------

                                                                                

Combining these conditions, we might flag out of stock situations that require attention as follows:

In [None]:
consolidated_oos_alerts = (
  lead_time_alerts
    .withColumn('alert_indicator', f.expr('''
      CASE
        WHEN on_hand_less_than_safety_stock = 1 AND insufficient_inventory_pipeline_units = 1 THEN 1 
        WHEN on_hand_less_than_safety_stock = 1 AND insufficient_inventory_pipeline_units != 1 AND insufficient_lead_time = 1 THEN 1
        ELSE 0
        END'''))
    .select(
      'date',
      'store_id',
      'sku',
      'product_category',
      'total_sales_units',
      'daily_sales_units',
      'alert_indicator'
      )
  )

consolidated_oos_alerts.show(5)

[Stage 323:>                                                        (0 + 1) / 1]

+----------+--------+---+----------------+-----------------+------------------+---------------+
|      date|store_id|sku|product_category|total_sales_units| daily_sales_units|alert_indicator|
+----------+--------+---+----------------+-----------------+------------------+---------------+
|2019-05-11|      63| 57|     Category 04|                0|0.8681318681318682|              0|
|2019-09-12|      63| 57|     Category 04|                0|0.7032967032967034|              0|
|2020-02-24|      63| 57|     Category 04|                3|0.9560439560439561|              0|
|2020-10-25|      63| 57|     Category 04|                2|0.4945054945054945|              0|
|2019-04-20|      98| 64|     Category 01|                8|1.4945054945054945|              1|
+----------+--------+---+----------------+-----------------+------------------+---------------+
only showing top 5 rows



                                                                                

## Step 7: Identify Zero Sales Issues

Not every product sells each day in each store location. But when a product goes unsold for a long period of time, it might be wise for someone to verify it is still in inventory.  

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/osa_tredence_zeroscan.png' width=75%>

The challenge is that what constitutes a *long* period of time.  Some products move relatively quickly while many products move only occasionally.  For a product that sales hundreds of units daily, we might suspect an inventory problem if we suddenly see no sales on a given day.  For a product that moves only a few units a month, a few days or even a few weeks with no units sold may be nothing to be concerned with.

With that in mind, we need to examine the number of days with zero units sold for each store-SKU combination relative to the total days a product is available for sale in order to understand the probability a product will experience a zero-sales day:

In [None]:
# calculate ratio of total days of zero sales and total days on shelf across observed period
zero_sales_totals = ( 
  inventory
    .withColumn('total_zero_sales_days', f.expr('CASE WHEN total_sales_units == 0 THEN 1 ELSE 0 END'))
    .withColumn('total_days', f.expr('1'))
    .groupBy(['store_id', 'sku'])
      .agg(
        f.sum('total_days').alias('total_days'),
        f.sum('total_zero_sales_days').alias('total_zero_sales_days')
        )
    .withColumn('zero_sales_day_probability', f.expr('total_zero_sales_days / total_days'))
    )

zero_sales_totals.show(5)

+--------+---+----------+---------------------+--------------------------+
|store_id|sku|total_days|total_zero_sales_days|zero_sales_day_probability|
+--------+---+----------+---------------------+--------------------------+
|    1540|155|       854|                  654|         0.765807962529274|
|    1283| 39|       854|                  794|        0.9297423887587822|
|    1822|111|       854|                  794|        0.9297423887587822|
|     397| 64|       854|                  584|        0.6838407494145199|
|     339| 64|       854|                  550|        0.6440281030444965|
+--------+---+----------+---------------------+--------------------------+
only showing top 5 rows



With these ratios in the back of our minds, we can now examine the number of consecutive days a product has experienced zero-units sold:

In [None]:
zero_sales_days = (
  
  inventory 
  
    # flag the occurance of first zero sales day in a series
    .withColumn('sales_change_flag', f.expr('''
        CASE 
          WHEN total_sales_units=0 AND LAG(total_sales_units,1) OVER(PARTITION BY store_id, sku ORDER BY date) != 0 THEN 1 
          ELSE 0 
          END''')) 
  
    # count number of zero sales day series to date (associates records with a given series)
    .withColumn('zero_sales_flag_rank', f.expr('SUM(sales_change_flag) OVER(PARTITION BY store_id, sku ORDER BY date)')) 
  
    # flag all zero sales days
    .withColumn('sales_change_flag_inv', f.expr('CASE WHEN total_sales_units = 0 THEN 1 ELSE 0 END')) 
  
    # count consecutive zero sales days (counter resets with a non-zero sales instance)
    .withColumn('total_days_wo_sales', f.expr('SUM(sales_change_flag_inv) OVER(PARTITION BY store_id, sku, zero_sales_flag_rank ORDER BY date)'))
    .withColumn('total_days_wo_sales', f.expr('CASE WHEN total_sales_units != 0 THEN 0 ELSE total_days_wo_sales END'))
    
    .select(
      'date',
      'store_id',
      'sku',
      'total_sales_units',
      'zero_sales_flag_rank',
      'sales_change_flag_inv',
      'total_days_wo_sales'
      )
  )

zero_sales_days.orderBy('store_id','sku','date').show(10)

+----------+--------+---+-----------------+--------------------+---------------------+-------------------+
|      date|store_id|sku|total_sales_units|zero_sales_flag_rank|sales_change_flag_inv|total_days_wo_sales|
+----------+--------+---+-----------------+--------------------+---------------------+-------------------+
|2019-01-01|      63| 57|                0|                   0|                    1|                  1|
|2019-01-02|      63| 57|                0|                   0|                    1|                  2|
|2019-01-03|      63| 57|                0|                   0|                    1|                  3|
|2019-01-04|      63| 57|                0|                   0|                    1|                  4|
|2019-01-05|      63| 57|                3|                   0|                    0|                  0|
|2019-01-06|      63| 57|                2|                   0|                    0|                  0|
|2019-01-07|      63| 57|            

We know the probability that a product will experience a zero-sales date in a given store location.  Using this value, we can calculate the probability of a product would realistically experience consecutive zero-sales days using a simple function.  When that probability reaches or exceeds a particular threshold (set here at 5% to reflect a fairly low risk tolerance), we might take that as a signal to send someone to inspect the inventory:

In [None]:
zero_sales_inventory = (
  zero_sales_days
    .join(zero_sales_totals.alias('prob'), on=['store_id', 'sku'], how = 'leftouter')
    .withColumn('zero_sales_probability', f.expr('pow(zero_sales_day_probability, total_days_wo_sales)'))
    .withColumn('no_sales_flag', f.expr('CASE WHEN zero_sales_probability < 0.05 THEN 1 ELSE 0 END'))
  )

zero_sales_inventory.orderBy('store_id','sku','date').show(5)

+--------+---+----------+-----------------+--------------------+---------------------+-------------------+----------+---------------------+--------------------------+----------------------+-------------+
|store_id|sku|      date|total_sales_units|zero_sales_flag_rank|sales_change_flag_inv|total_days_wo_sales|total_days|total_zero_sales_days|zero_sales_day_probability|zero_sales_probability|no_sales_flag|
+--------+---+----------+-----------------+--------------------+---------------------+-------------------+----------+---------------------+--------------------------+----------------------+-------------+
|      63| 57|2019-01-01|                0|                   0|                    1|                  1|       854|                  654|         0.765807962529274|     0.765807962529274|            0|
|      63| 57|2019-01-02|                0|                   0|                    1|                  2|       854|                  654|         0.765807962529274|    0.586461835473

## Step 8: Identify Alert Conditions

We now have identified several conditions that require attention.  We first identified problematic phantom inventory conditions and then identified inventory below safety stock levels.  Finally, we identified days with zero sales events likely to be a result of an inventory issue.  In this last step, we'll consolidate all this information to build a set with which we can more clearly identify inventory dates requiring attention from analysts:

In [None]:
all_alerts = (
  consolidated_oos_alerts.alias('oos') # OOS alert
    .join(phantom_inventory.alias('pi'), on=['store_id','sku','date'], how='leftouter') # pahntom inventory indicator
    .join(zero_sales_inventory, on=['store_id','sku','date'], how='leftouter') # zero sales alert
    .selectExpr(
      'date',
      'store_id',
      'sku',
      'product_category',
      'oos.total_sales_units',
      'oos.alert_indicator as oos_alert',
      'oos.daily_sales_units',
      'no_sales_flag as zero_sales_flag',
      'phantom_inventory',
      'phantom_inventory_ind'
      )
  )

all_alerts.show(5)

[Stage 420:>                                                        (0 + 1) / 1]

+----------+--------+---+----------------+-----------------+---------+------------------+---------------+-----------------+---------------------+
|      date|store_id|sku|product_category|total_sales_units|oos_alert| daily_sales_units|zero_sales_flag|phantom_inventory|phantom_inventory_ind|
+----------+--------+---+----------------+-----------------+---------+------------------+---------------+-----------------+---------------------+
|2019-05-11|      63| 57|     Category 04|                0|        0|0.8681318681318682|              0|                0|                    0|
|2019-09-12|      63| 57|     Category 04|                0|        0|0.7032967032967034|              0|                0|                    0|
|2020-02-24|      63| 57|     Category 04|                3|        0|0.9560439560439561|              0|               -1|                    0|
|2020-10-25|      63| 57|     Category 04|                2|        0|0.4945054945054945|              0|              -37| 

                                                                                

In [None]:
# (
#   all_alerts
#     .repartition(3)
#     .write
#       .format('parquet')
#       .mode('overwrite')
#       .option('overwriteSchema', 'true')
#       .saveAsTable('osa_inventory_flagged')
#    )
all_alerts.createOrReplaceTempView("osa_inventory_flagged")

## Step 9: Access Data

In the last notebook, we identified inventory problems through the detection of excessive phantom inventory, stocking levels below safety stock thresholds and unexpected numbers of consecutive days with zero sales.  We might label these *out-of-stock* issues in that they identify scenarios where product is simply not available to be sold.

In this notebook, we want to add a fourth inventory scenario, one where insufficient stock may not fully prevent sales but where they may cause us to miss our sales expectations.  Misplacement of product in the store or displays which give the customer a sense a product is not available are both examples of the kinds of issues we might describe as *on-shelf availability* problems.

With this fourth scenario, we will generate a forecast for sales and identify historical sales values that were depressed relative to what was expected. These periods of lower than expected sales may then be investigated as periods potentially experiencing OSA challenges. To generate this forecast, we must first access our historical sales data:

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as f

import pandas as pd
import numpy as np
import math
from datetime import timedelta
	
from statsmodels.tsa.holtwinters import SimpleExpSmoothing

In [None]:
inventory_flagged = spark.table('osa_inventory_flagged')

## Step 10: Generate Forecast

Unlike most forecasting exercises, our goal is not to predict future values but instead to generate *expected* values for the historical period.  To do this, we may make use of a variety of forecasting techniques. Most enterprises already have established preferences for sales forecasting so instead of wading into the conversation about which techniques are best in different scenarios, we will make use of a [simple exponential smoothing](https://en.wikipedia.org/wiki/Exponential_smoothing) as a placeholder technique so that we might focus on the analysis against the forecasted values in later steps.

Our challenge now is to generate a forecast for each store-SKU combination in our dataset.  Leveraging a forecast scaling technique [previously demonstrated](https://databricks.com/blog/2021/04/06/fine-grained-time-series-forecasting-at-scale-with-facebook-prophet-and-apache-spark-updated-for-spark-3.html), we will write a function capable of generating a forecast for a given store-SKU combination and then apply it to all store-SKU combinations in our dataset in a scalable, distributed manner:

In [None]:
alpha_value = 0.8 # smoothing factor

# function to generate a forecast for a store-sku
def get_forecast(keys, inventory_pd: pd.DataFrame) -> pd.DataFrame:
  
  # identify store and sku
  store_id = keys[0]
  sku = keys[1]
  
  # identify date range for predictions
  history_start = inventory_pd['date'].min()
  history_end = inventory_pd['date'].max()
  
  # organize data for model training
  with warnings.catch_warnings(record=True):
      timeseries = (
        inventory_pd
          .set_index('date', drop=True, append=False) # move date to index
          .sort_index() # sort on date-index
        )['total_sales_units'] # just need this one field

  # fit model to timeseries
  model = SimpleExpSmoothing(timeseries, initialization_method='heuristic').fit(smoothing_level=alpha_value, optimized=False)
  
  # predict sales across historical period
  predictions = model.predict(start=history_start, end=history_end)
  
  # convert timeseries to dataframe for return
  predictions_pd = predictions.to_frame(name='predicted_sales_units').reset_index() # convert to df
  predictions_pd.rename(columns={'index':'date'}, inplace=True) # rename 'index' column to 'date'
  predictions_pd['store_id'] = store_id # assign store id
  predictions_pd['sku'] = sku # assign sku
  
  return predictions_pd[['date', 'store_id', 'sku', 'predicted_sales_units']]

# structure of forecast function output
forecast_schema = StructType([
  StructField('date', DateType()), 
  StructField('store_id', IntegerType()), 
  StructField('sku', IntegerType()), 
  StructField('predicted_sales_units', FloatType())
  ])

In [None]:
# get forecasted values for each store-sku combination

forecast = (
  inventory_flagged
    .groupby(['store_id','sku'])
      .applyInPandas(
        get_forecast, 
        schema=forecast_schema
        )
    .withColumn('predicted_sales_units', f.expr('ROUND(predicted_sales_units,0)')) # round values to nearest integer
    )

forecast.show(10)

[Stage 480:>                                                        (0 + 1) / 1]

+----------+--------+---+---------------------+
|      date|store_id|sku|predicted_sales_units|
+----------+--------+---+---------------------+
|2019-01-01|      63| 57|                  0.0|
|2019-01-02|      63| 57|                  0.0|
|2019-01-03|      63| 57|                  0.0|
|2019-01-04|      63| 57|                  0.0|
|2019-01-05|      63| 57|                  0.0|
|2019-01-06|      63| 57|                  2.0|
|2019-01-07|      63| 57|                  2.0|
|2019-01-08|      63| 57|                  0.0|
|2019-01-09|      63| 57|                  0.0|
|2019-01-10|      63| 57|                  2.0|
+----------+--------+---+---------------------+
only showing top 10 rows



  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


In [None]:
forecast.createOrReplaceTempView("osa_inventory_forecast")

  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


## Step 11: Identify *Off* Sales Issues

With forecasts in-hand, we will now look for historical periods where there is not only a lower than expected number of sales (relative to our forecasts) but where this difference grows over a number of days. Identifying these periods may help us identify on-shelf availability (OSA) concerns we may need to address.  

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/osa_tredence_offsales.jpg' width=75%>

Of course, not every missed sales target is an OSA event.  To focus our attention, we will look for periods of sustained misses where the miss is sizeable relative to our expectations.  In the code that follows, we require 4-days of increasing misses with an average daily miss of 20% or more of the expected sales. Some organizations may wish to increase or decrease these threshold requirements depending on the nature of their business:

In [None]:
inventory_forecast = spark.table('osa_inventory_forecast')

osa_flag_output = (
  
  inventory_flagged.alias('inv')
    .join(inventory_forecast.alias('for'), on=['store_id','sku','date'], how='leftouter')
    .selectExpr(
      'inv.*',
      'for.predicted_sales_units'
      )
             
    # calculating difference between forecasted and actual sales units
    .withColumn('units_difference', f.expr('predicted_sales_units - total_sales_units'))
    .withColumn('units_difference', f.expr('COALESCE(units_difference, 0)'))

    # check whether deviation has been increasing over past 4 days
    .withColumn('osa_alert_inc_deviation', f.expr('''
      CASE 
        WHEN units_difference > LAG(units_difference, 1) OVER(PARTITION BY store_id, sku ORDER BY date) AND 
             LAG(units_difference, 1) OVER(PARTITION BY store_id, sku ORDER BY date) > LAG(units_difference, 2) OVER(PARTITION BY store_id, sku ORDER BY date) AND 
             LAG(units_difference, 2) OVER(PARTITION BY store_id, sku ORDER BY date) > LAG(units_difference, 3) OVER(PARTITION BY store_id, sku ORDER BY date)
             THEN 1
        ELSE 0 
        END'''))
    .withColumn('osa_alert_inc_deviation', f.expr('COALESCE(osa_alert_inc_deviation, 0)'))

    # rolling 4 day average of sales units
    .withColumn('sales_4day_avg', f.expr('AVG(total_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)'))

    # rolling 4 day average of forecasted units
    .withColumn('predictions_4day_avg', f.expr('AVG(predicted_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)'))

    # calculating deviation in rolling average of sales and forecast units
    .withColumn('deviation', f.expr('(predictions_4day_avg - sales_4day_avg) / (predictions_4day_avg+1)'))
    .withColumn('deviation', f.expr('COALESCE(deviation, 0)'))

    # Considering 20% deviation as the threshold for OSA flag
    .withColumn('off_sales_alert', f.expr('''
      CASE 
        WHEN deviation > 0.20  AND osa_alert_inc_deviation = 1 THEN 1
        ELSE 0
        END'''))

    .select('date', 
            'store_id', 
            'sku', 
            'predicted_sales_units', 
            'off_sales_alert',
            'oos_alert', 
            'zero_sales_flag', 
            'phantom_inventory', 
            'phantom_inventory_ind')
    )

osa_flag_output.show(5)

  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._

+----------+--------+---+---------------------+---------------+---------+---------------+-----------------+---------------------+
|      date|store_id|sku|predicted_sales_units|off_sales_alert|oos_alert|zero_sales_flag|phantom_inventory|phantom_inventory_ind|
+----------+--------+---+---------------------+---------------+---------+---------------+-----------------+---------------------+
|2019-01-01|      63| 57|                  0.0|              0|        0|              0|             NULL|                    0|
|2019-01-02|      63| 57|                  0.0|              0|        0|              0|                0|                    0|
|2019-01-03|      63| 57|                  0.0|              0|        0|              0|                0|                    0|
|2019-01-04|      63| 57|                  0.0|              0|        0|              0|                1|                    1|
|2019-01-05|      63| 57|                  0.0|              0|        0|              0| 

# END
Thank you