# Demand Planning KPI Monitor

In [1]:
import os, sys, gc, datetime, time

import pandas as pd
pd.set_option('max_columns', 200)
pd.set_option('max_rows', 200)
from pandas.plotting import register_matplotlib_converters
from pandas import ExcelWriter

import numpy as np
import matplotlib.pyplot as plt
import matplotlib. dates as mdates
%matplotlib inline

import seaborn as sns
plt.style.use('seaborn')
%config InlineBackend.figure_format = 'retina'

register_matplotlib_converters()

from pyspark.sql import SparkSession
from impala.dbapi import connect

In [2]:
## Get parameters, if not given then fall back to default values

tfmt = '%Y%m%d'
_end = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime(tfmt)

if 'MONITOR_RUN_DATE' in os.environ:
    print('Using external parameters.')
    _end = os.environ.get('MONITOR_RUN_DATE')
else:
    print('Using default parameters.')
    
_start = (datetime.datetime.strptime(_end, '%Y%m%d').date() - datetime.timedelta(days=60)).strftime(tfmt)
date_str = _end

CONSISTENCY_START, CONSISTENCY_END = _start, _end
OOS_CHECK_DATE = _end

print('Consistency:', CONSISTENCY_START, CONSISTENCY_END, sep='\t')
print('OOS_CHECK_DATE:', OOS_CHECK_DATE, sep='\t')

Using default parameters.
Consistency:	20190724	20190922
OOS_CHECK_DATE:	20190922


In [3]:
record_folder = '/data/jupyter/Carrefour-China-Supply-Chain-Forecast/output/monitoring/'

consistency_file = f'report_consistency_items_{date_str}.xlsx'

In [4]:
print('Report generation time:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), end='\n\n')
T0 = time.time()

Report generation time: 2019-09-23 08:46:05



---

In [5]:
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars /data/jupyter/kudu-spark2_2.11-1.8.0.jar pyspark-shell'
warehouse_location = os.path.abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Forecast consistency check") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("spark.blacklist.enabled", False) \
    .config("spark.driver.memory", '6g') \
    .config("spark.executor.memory", '6g') \
    .config("spark.num.executors", '14') \
    .enableHiveSupport() \
    .getOrCreate()

In [6]:
kudu_tables = [
    'lfms.daily_dctrxn', 'lfms.daily_dcstock', 'lfms.ord', 'lfms.daily_shipment'
]

for tbl in kudu_tables:
    spark.read.format('org.apache.kudu.spark.kudu') \
    .option('kudu.master', "dtla1apps11:7051,dtla1apps12:7051,dtla1apps13:7051") \
    .option('kudu.table', f'impala::{tbl}') \
    .load() \
    .registerTempTable('{}'.format(tbl.replace('.', '_')))

In [7]:
def get_query(sql_path, kudu_replace=None, **query_params):
    with open(sql_path, 'r') as f:
        query = f.read()
    if kudu_replace is not None:
        for k, v in kudu_replace.items():
            query = query.replace(k, v)
   
    query = query.format(**query_params)

    return query

In [8]:
def read_query_and_fetch(sql_path, create_table=False, get_query=False, kudu_replace=None, **query_params):
    with open(sql_path, 'r') as f:
        query = f.read()
    if kudu_replace is not None:
        for k, v in kudu_replace.items():
            query = query.replace(k, v)
    if not create_table:
        ## remove lines with `table`
        q0 = query
        query = '\n'.join([line for line in q0.split('\n')
                           if ('drop table' not in line.lower())
                           and ('create table' not in line.lower())])
    query = query.format(**query_params)
    if get_query:
        return query
    return spark.sql(query).toPandas()

In [None]:
def run_sql_with_impala(sql):
    with connect(host='dtla1apps14', port=21050, auth_mechanism='PLAIN', user='CHEXT10211', password='datalake2019',
                 database='vartefact') as conn:
        curr = conn.cursor()
        curr.execute(sql)

## Consistency

In [None]:
def to_dt(x):
    return pd.to_datetime(x, format='%Y%m%d')

def process_for_consistency(sim_hist):
    ord0 = sim_hist[sim_hist.order_day == sim_hist.run_date]
    if ord0.shape[0] == 0:
        return pd.DataFrame()

    chk = ord0[['sub_id', 'item_id', 'item_code', 'item_name', 'barcode', 'rotation',
                'supplier_name', 'order_day', 'order_qty_box']] \
        .rename(columns={'order_qty_box': 'order_qty (box)'}).copy()
    for i in range(2, 8):  # week 2~7
        ord1 = sim_hist[(to_dt(sim_hist.order_day) - to_dt(sim_hist.run_date)).dt.days == int(i * 7)]
        chk = chk.merge(ord1[['sub_id', 'order_day', 'order_qty_box']]
                        .rename(columns={'order_qty_box': f'Week-{i}_before_order_qty'}),
                        on=['sub_id', 'order_day'], how='left')

    ## aggregate by order week
    chk['order_day'] = to_dt(chk.order_day)
    chk['order_week'] = (chk.order_day - pd.to_timedelta(chk.order_day.dt.weekday, unit='days')
                        ).dt.strftime('%Y%m%d')
    chk['Week of year'] = 'W' + chk['order_day'].dt.week.astype(str)
    chk = chk.drop(columns=['order_day', 'sub_id'])
    chk = chk.groupby(['item_id', 'item_code', 'item_name', 'barcode', 'rotation',
                       'supplier_name', 'order_week', 'Week of year']).agg(lambda x: x.sum(skipna=False)).reset_index()

    for i in range(2, 8):  # calculate consistency error
        chk[f'Week-{i}_before_error'] = (
            chk[f'Week-{i}_before_order_qty'] -
            chk['order_qty (box)']) / (chk['order_qty (box)'] + 1.e-6)
        chk[f'Week-{i}_before_run_date'] = (to_dt(chk.order_week) -
                                            datetime.timedelta(days=i*7)).dt.strftime('%m/%d/%Y')

    columns = ['item_id', 'item_code', 'item_name', 'barcode', 'rotation',
               'supplier_name', 'order_week', 'order_qty (box)', 'Week of year']
    for i in range(2, 8):
        columns += [f'Week-{i}_before_run_date', f'Week-{i}_before_order_qty', f'Week-{i}_before_error']
    return chk[columns]

def xavier_method_v2(forecast, actual):
    ''' sum(diff) / sum(forecast) '''
    return (actual - forecast).abs().sum() / (actual.sum() + 1.e-6)

In [None]:
## Only trigger on Sunday

if datetime.datetime.strptime(date_str, '%Y%m%d').date().weekday() == 6:
    order_hist = spark.sql(get_query(
    'sql/kpi_consistency_order_hist.sql',
    database_name='vartefact', CONSISTENCY_START=CONSISTENCY_START, CONSISTENCY_END=CONSISTENCY_END,
)).toPandas()
    
    _onstock = order_hist[order_hist.rotation.isin(['A', 'B'])].copy()
    _onstock['order_qty_box'] = np.ceil(_onstock['order_qty'].astype('f8') / _onstock['pcb'].astype('f8'))
    chk_onstock = process_for_consistency(_onstock)

    _xdock = order_hist[order_hist.rotation == 'X'].copy()
    _xdock['order_qty_box'] = np.ceil(_xdock['order_qty'].astype('f8') / _xdock['pcb'].astype('f8'))
    _xdocking = (_xdock.groupby(['order_day', 'run_date', 'sub_id', 'rotation', 'item_id', 'item_code',
                                 'supplier_name', 'barcode', 'item_name'])['order_qty_box'].sum().reset_index())
    chk_xdocking = process_for_consistency(_xdocking)

    if chk_xdocking.shape[0] > 0 and chk_onstock.shape[0] > 0:
        ## Merge, keep only latest week, save to excel
        chk = pd.concat([chk_onstock, chk_xdocking], axis=0)
        chk = chk[chk.order_week == chk.order_week.max()]
        chk.sort_values(by='item_code') \
           .to_excel(record_folder + consistency_file,
                     sheet_name='template', index=False)
        ## Print overall consistency (only print week 2)
        flow_a = chk.query('rotation == "A"')
        flow_b = chk.query('rotation == "B"')
        flow_x = chk.query('rotation == "X"')
        print('Consistency (week 2):\nOverall: {:.2f}\nFlow A: {:.2f}%\nFlow B: {:.2f}%\nFlow X: {:.2f}%'
              .format(
                  xavier_method_v2(chk['Week-2_before_order_qty'], chk['order_qty (box)']),
                  xavier_method_v2(flow_a['Week-2_before_order_qty'], flow_a['order_qty (box)']),
                  xavier_method_v2(flow_b['Week-2_before_order_qty'], flow_b['order_qty (box)']),
                  xavier_method_v2(flow_x['Week-2_before_order_qty'], flow_x['order_qty (box)'])
              ))
    else:
        print(f'There is not enough order history data between {CONSISTENCY_START} and '
              f'{CONSISTENCY_END} to calculate consistency.')
else:
    print('Today is not Sunday. Will not calculate consistency.')

---

In [None]:
T1 = time.time()

---

In [None]:
print(f'Generating consistency report takes {T1-T0:.2f} seconds.')