# Demand Forecasting (AI, Data Innovation Team, CJ Express, TILDI)

## File: Human Baseline Model

 C.J. Express Group Co.,Ltd. All Rights Reserved.
  
 Year: 2021

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f6e9c671e50>


In [6]:
# import library
from google.cloud import bigquery
from datetime import date, timedelta

import numpy as np
import pandas as pd
from graphframes import *
from itertools import combinations

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

from pyspark.sql import Row, SparkSession, functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import *
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

import math
import time
import warnings
warnings.filterwarnings('ignore')

# Function

In [7]:
def extract_date(df_extract_date, columns='SalDate'):
    """Extract data to date features.

    :param spark: Spark session object.
    :return: Spark DataFrame.
    """
    split_col = F.split(df_extract_date[columns], '-')
    df_extract_date = (df_extract_date
                        .withColumn('DayofMonth', split_col.getItem(2).cast('integer')))
    df_extract_date = (df_extract_date
                        .withColumn('Yearday', F.dayofyear(F.col(columns)))
                        .withColumn('Month', F.month(F.col(columns)))
                        .withColumn('DayofWeek', F.dayofweek(F.col(columns)))
                        .withColumn('Year', F.year(F.col(columns)))
                        .withColumn('Quarter', F.quarter(F.col(columns)))
                        .withColumn('WeekOfYear', F.weekofyear(F.col(columns))))
      
    return df_extract_date

def stratified_split_train_test(df, frac, label, join_on, seed=42):
    fractions = df.select(label).distinct().withColumn("fraction", F.lit(frac)).rdd.collectAsMap()
    df_frac = df.stat.sampleBy(label, fractions, seed)
    df_remaining = df.join(df_frac, on=join_on, how="left_anti")
    return df_frac, df_remaining

def cast_double_types(df_cast, c=[]):
  for col in c:
    df_cast = df_cast.withColumn(
      col,
      F.col(col).cast("double")
    )
  return df_cast

def cast_int_types(df_cast, c=[]):
  for col in c:
    df_cast = df_cast.withColumn(
      col,
      F.col(col).cast("int")
    )
  return df_cast

def cast_date_types(df_cast, c=[]):
  for col in c:
    df_cast = df_cast.withColumn(
      col,
      F.col(col).cast(DateType())
    )
  return df_cast

def percentage_error(actual, predicted):
    res = np.empty(actual.shape)
    for j in range(actual.shape[0]):
        if actual[j] != 0:
            res[j] = np.absolute(actual[j] - predicted[j]) / actual[j]
        else:
            res[j] = predicted[j] / np.mean(actual)
    return res

def mape(y_true, y_pred): 
    return np.mean(np.abs(percentage_error(np.asarray(y_true), np.asarray(y_pred)))) * 100

def rmsle_spark(df, label = 'TotalQtySale'):
    sle = ((F.log(F.col('prediction') + F.lit(1)) - F.log(F.col(label) + F.lit(1)))**2).alias('sle')
    df = df.select(sle)
    return np.sqrt(df.groupby().agg(F.avg('sle').alias('msle')).collect()[0][0])

def mape_spark(df, label = 'TotalQtySale'):
    ape = ((F.abs(F.col('prediction') - F.col(label)) * F.lit(100)) / F.col(label)).alias('ape')
    df = df.select(ape)
    return df.groupby().agg(F.avg('ape').alias('mape')).collect()[0][0]
  
def smape_spark(df, label = 'TotalQtySale'):
    sape = ((F.abs(F.col('prediction') - F.col(label)) * F.lit(100))\
           / ((F.col('prediction') + F.col(label)) / F.lit(2))).alias('sape')
    df = df.select(sape)
    return df.groupby().agg(F.avg('sape').alias('smape')).collect()[0][0]

# Main

In [11]:
# define features
label_cols = ['TotalQtySale']
numeric_features = ['avgPriceDis','avgPrice','Yearday','Month',
                    'DayofWeek','Year','Quarter','WeekOfYear','DayofMonth']
string_features = ['BranchCode','MaterialCode','types'] # types = promotions

# initiate variables
select_variables_initial = ['BranchCode','MaterialCode','Date','avgPriceDis',
                            'avgPrice','supPrice','label','totalNetSale',
                            'TotalQtySale','types','Branch','Name']


# define the model name
version = 'BASE_MODEL_001'
versionFeaturePL = "BASE_PL_MODEL_001"

Pipeline_Model_PATH = "model_pipeline_"+str(versionFeaturePL)+".plmodel"
Model_PATH = "model_ml_"+str(version)+".plmodel"

# input
data = './tildi_demandforecasting_type1_poc.csv'
data_location = './location.csv'
data_category = './category.csv'

# training/testing split date
split_date = '2020-12-01'


# output
result_list = []

def main():
    """Main ML script definition.

    :return: None
    """
    # # start Spark application and get Spark session, logger and config
    # spark, log, config = start_spark(
    #     app_name='my_ml_job',
    #     files=['configs/ml_config.json'])

    # # log that main ETL job is starting
    # log.warn('cj_ml_job is up-and-running')

    # import data 
    df = (spark
          .read 
          .format("csv")
          .option("header", "true")
          .load(data))

    df_location = (spark
                  .read.format("csv")
                  .option("header", "true")
                  .load(data_location))

    df_category = (spark
                  .read.format("csv")
                  .option("header", "true")
                  .load(data_category))

    # cache data
    df.cache()
    df_location.cache()
    df_category.cache()

    # initial variables
    df = df.select(select_variables_initial)

    # join location
    df = df.join(df_location, how='left',on =['BranchCode'] )

    # clean
    df = df.dropna(how='all')

    # casting type
    df = cast_double_types(df, ['avgPriceDis','avgPrice','TotalQtySale','supPrice','label','totalNetSale',])
    df = cast_int_types(df, ['label','ZipCode'])
    df = cast_date_types(df, ['Date'])

    # extracting date feature
    df = extract_date(df,'Date')

    # clean
    df2 = df.dropna()
    # check missing value
    # df2.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df2.columns)).show()

    # Train/Test Split
    df_train = (df
                .filter(df["Date"]<split_date)) 
    df_test = (df
                .filter((df["Date"]>=split_date)))

    # creating new features: item average as prediction
    df_train_avg = (df_train.groupBy("BranchCode", "MaterialCode")
                           .agg(avg("TotalQtySale")
                           .alias('prediction_avg')))
    df_train_avg_dow = (df_train.groupBy("BranchCode", "MaterialCode", "DayofWeek")
                              .agg(avg("TotalQtySale")
                               .alias('prediction_avg_dow')))
    df_train_avg_by_month = (df_train.groupBy("BranchCode", "MaterialCode", "Month")
                                    .agg(avg("TotalQtySale")
                                    .alias('prediction_avg_month')))
    df_train_avg_all_store = (df_train.groupBy("MaterialCode")
                                      .agg(avg("TotalQtySale")
                                      .alias('prediction_avg_all_store')))
    df_train_avg_dow_all_store = (df_train.groupBy("MaterialCode", "DayofWeek")
                                          .agg(avg("TotalQtySale")
                                          .alias('prediction_avg_dow_all_store')))
    df_train_avg_by_month_all_store = (df_train.groupBy("MaterialCode", "Month")
                                              .agg(avg("TotalQtySale")
                                              .alias('prediction_avg_month_all_store')))

    # join new features
    df_test2 = (df_test
            .join(df_train_avg_all_store, 
                  how='left', 
                  on =['MaterialCode']))
    df_test2 = (df_test2
                .join(df_train_avg_dow_all_store, 
                      how='left', 
                      on =['MaterialCode','DayofWeek']))
    
    # # remove missing value
    df_test2 = df_test2.dropna()

    # # # check missing value
    # print(df_test2.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_test2.columns))
    #     .show())

    df_predict = (df_test2.withColumn("prediction", 
                                      df_test2["prediction_avg_all_store"])) # prediction_avg_all_store, prediction_avg_dow_all_store 

    # Performance Evaluation: Overall
    actual = list(df_predict.select('TotalQtySale')
                            .toPandas()['TotalQtySale'])
    pred = list(df_predict.select('prediction')
                            .toPandas()['prediction'])

    #for metricName in ['rmse','mse','r2','mae']:# all metrics
    for metricName in ['rmse','mae']:
        evaluator = RegressionEvaluator(labelCol="TotalQtySale", predictionCol="prediction", metricName=metricName)
        result = evaluator.evaluate(df_predict)
        print ('%s = %g' % (metricName,result))
        result_list.append(result)

    # mape
    mape = mape_spark(df_predict, label = 'TotalQtySale')
    result_list.append(mape)
    print("mape =", mape)

    # rmsle
    rmsle = rmsle_spark(df_predict, label = 'TotalQtySale')
    result_list.append(rmsle)
    print("rmsle =", rmsle)

    # accuracy
    acc = (1-(np.exp(rmsle)-1))*100
    result_list.append(acc)
    print("acc =", acc)

    print(result_list)
    print("Inference Process: Done")

    # Performance Evaluation: OUT OF STOCK (OOS)
    overall_test_number = df_predict.count()
    oos_number = (df_predict
                .filter((df_predict["prediction"]<df_predict["TotalQtySale"])
                    &(df_predict["TotalQtySale"]!=0)).count())
    print("percent of oos", oos_number*100/overall_test_number) 

    # log the success and terminate Spark application
    # log.warn('cj_ml_job is finished')
    
    #spark.stop()
    
    return None

# entry point for CJ PySpark ML application
if __name__ == '__main__':
    # start time
    start_time = time.time()
    main()
    print("--- %s seconds ---" % (time.time() - start_time))

rmse = 3.46269
mae = 0.622755
mape = 77.8636734280258
rmsle = 0.4317214181538035
acc = 46.00939344735213
[3.462687450866297, 0.6227547545467854, 77.8636734280258, 0.4317214181538035, 46.00939344735213]
Inference Process: Done
percent of oos 15.433566951024853
--- 132.91115593910217 seconds ---
