#Set Up

##Libraries

In [0]:
# SPARK LIBRARIES 
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window


# WARNINGS AND LOGISTICS 
#   Make sure we do not get line breaks when doing show on wide dataframes
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
import warnings
warnings.filterwarnings("ignore")

#Fix False Positives

##Function: read_delta_table

In [0]:
def read_delta_table(table_name):
    """
    Description:
        read table if exists

    Args:
        table_name (StringType): path to the delta table.

    Returns:
        df (pyspark.sql.dataframe): delta table as a dataframe.
    """
    
    #try to read table
    try:
        df = spark.read.table(table_name)
        return df
    #fail gracefully
    except:
        print(table_name, 'table not found. Ensure you have the right path.')

##Function: daily_delivery_execution

In [0]:
def daily_delivery_execution(df):
    """
    Description:
        aggregate the delivery execution table to a daily level

    Args:
        df (pyspark.sql.dataframe): raw delivery execution table.

    Returns:
        df_daily (pyspark.sql.dataframe): daily aggregated delivery execution table.
    """

    # group by date, material, customer. sum the ORDERED and DELIVERED columns
    df_daily = (df.groupBy('date', 'material', 'customer')
          .agg(F.sum('ordered').alias('ordered'),
               F.sum('delivered').alias('delivered')
              )
         )
    
    return df_daily

##Function: fix_fp_orders

In [0]:
def fix_fp_orders(df_daily):
    """
    Description:
        remove false positive orders and return actual demand

    Args:
        df_daily (pyspark.sql.dataframe): daily aggregated delivery execution table.

    Returns:
        df_fp_fix (pyspark.sql.dataframe): daily demand with false positives removed.
    """
    
    #filter out rows with no orders
    #these are returns and do not represent true demand
    df_daily = df_daily.filter(df_daily.ordered > 0)
    
    #set window by which to check false positives
    win = Window.partitionBy('material', 'customer').orderBy('date')
    
    #add column that has previous row's delivered value
    df_lag = df_daily.withColumn('delivered_lag', F.lag('delivered', 1).over(win))
    
    #build flag where if lag delviery is greater than zero, 1. else 0.
    df_bool = (df_lag.withColumn('flag', F.when(df_lag.delivered_lag > 0, 1)
                                 .otherwise(0))#fill first row (has no lag) with 0 
              )
    
    #cumulative sum of flag 
    df_group = df_bool.withColumn('flag_group', F.sum(df_bool.flag).over(win))
    
    #group over the window and 'true positive' group.
    df_fp_fix = (df_group.groupBy('material', 'customer', 'flag_group')
             .agg(F.min('date').alias('date'), #use the minimum date from true positive group
                  F.max('ordered').alias('demand')) #use the maximum order from true positive group
            )
    
    #remove extra column
    df_fp_fix = df_fp_fix.drop('flag_group')
    
    return df_fp_fix

#Aggregate to Weekly Time Series

##Function: agg_weekly_demand

In [0]:
def agg_weekly_demand(df):
    """
    Description:
        aggregate demand to a weekly time series

    Args:
        df (pyspark.sql.dataframe): daily demand with false positives removed.

    Returns:
        df_week (pyspark.sql.dataframe): weekly demand by material and customer.
    """
    
    #calcuate next friday for each date
    df_week = df.withColumn('ts_date',
                       F.date_add(df.date, 6 - (F.dayofweek(df.date) % 7) ) #calculate next Friday
                      )
    #group over weeks
    df_week = (df_week.groupby('material', 'customer', 'ts_date') #group by week
          .agg(F.sum('demand').alias('demand')) #sum demand
         )
    
    return df_week


#Write Tables

In [0]:
#set name of raw delivery execution table
table_name = 'ltf_db.ltf_delivery_execution'
#read raw delviery execution from delta table
de = read_delta_table(table_name)
#aggregate to daily level
de_daily = daily_delivery_execution(de)
#fix false positives
demand_daily = fix_fp_orders(de_daily)
#write table
demand_daily.write.mode('overwrite').saveAsTable('ltf_db.ltf_demand_daily')

#aggregate by week
demand_weekly = agg_weekly_demand(demand_daily)
#write table
demand_weekly.write.mode('overwrite').saveAsTable('ltf_db.ltf_demand_weekly')

#Appendix

In [0]:
# #raw data
# df_raw = spark.read.table('ltf_db.ltf_delivery_execution')
# df_raw.filter( (df_raw.material == 151988)  & (df_raw.customer == 500292145) ).display()

date,material,customer,ordered,delivered
2019-02-20,151988,500292145,17.0,17.0
2019-06-30,151988,500292145,17.0,17.0
2019-08-19,151988,500292145,17.0,17.0
2021-05-18,151988,500292145,51.0,51.0
2019-03-03,151988,500292145,34.0,34.0
2019-01-04,151988,500292145,34.0,34.0
2019-08-26,151988,500292145,17.0,17.0
2019-07-08,151988,500292145,17.0,17.0
2019-08-01,151988,500292145,17.0,17.0
2019-06-19,151988,500292145,17.0,17.0


In [0]:
# #daily data
# df_day = spark.read.table('ltf_db.ltf_demand_daily')
# df_day.filter( (df_day.material == 151988)  & (df_day.customer == 500292145) ).display()

material,customer,date,demand
151988,500292145,2019-01-04,34.0
151988,500292145,2019-01-16,17.0
151988,500292145,2019-01-31,17.0
151988,500292145,2019-02-13,17.0
151988,500292145,2019-02-20,17.0
151988,500292145,2019-03-03,34.0
151988,500292145,2019-03-14,17.0
151988,500292145,2019-03-24,17.0
151988,500292145,2019-04-04,17.0
151988,500292145,2019-04-21,17.0


In [0]:
# #weekly data
# df_week = spark.read.table('ltf_db.ltf_demand_weekly')
# df_week.filter( (df_week.material == 151988)  & (df_week.customer == 500292145) ).display()

material,customer,ts_date,demand
151988,500292145,2019-01-04,34.0
151988,500292145,2019-01-18,17.0
151988,500292145,2019-02-01,17.0
151988,500292145,2019-02-15,17.0
151988,500292145,2019-02-22,17.0
151988,500292145,2019-03-08,34.0
151988,500292145,2019-03-15,17.0
151988,500292145,2019-03-29,17.0
151988,500292145,2019-04-05,17.0
151988,500292145,2019-04-26,34.0
