In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit,minimize
import warnings
warnings.filterwarnings("ignore")

# Pricing 
## Preparation

In [3]:
# DATA PATH
path = 'Pricing/'
output_path = 'RESULT/'
file_name = 'Pricing Test Data.xlsx' # input file : with md bounds and inv bounds

In [4]:
# read data 
data = pd.read_excel(path+file_name,header = 0)
data.columns = ['SKU','MSRP', 'DATE', 'SALES_QTY','SALES_AMT','MD_LOWER_BOUND','MD_UPPER_BOUND','INV']
# calculate MD
data['MSRP2'] = data['SALES_AMT']/data['SALES_QTY'] # calculate current price
data['TEMP_MP'] = 1 - data['MSRP2']/data['MSRP']
data['MD'] = data['TEMP_MP'].apply(lambda x: 0 if (x < 0) else x)

## Fitting MD-Sold curve

In [5]:
# md-sold function:3 types
def linear(x,a,b):
    return a * x + b

def exp(x,a,b):
    return a * np.exp(b*x)

def log(x,a,b):
    return a + b*np.log(x+1)

# fit function
def reg(func,X,y):
    fitParams,_ = curve_fit(func,X,y) # 拟合曲线，func在上面的linear exp log1里面选
    y_pred = func(X,fitParams[0],fitParams[1]) # 计算拟合后的y'
    mse = np.mean((y-y_pred)**2) # 计算MSE
    return fitParams,mse,y_pred

# demand function
def demand_func(msrp,func,fitParams,x):
    # FORMULA: demand = msrp * (1-md) * sold(md)
    demand = msrp * (1-x) * func(x,fitParams[0],fitParams[1]) 
    return demand

# plot function
def plot_fit(X,y,y_pred):
    plt.plot(X,y,"o",markersize = 1,alpha = 0.8,label='actual') # 原input数据
    plt.plot(X,y_pred,'ro',markersize = 1,alpha = 1,label='predict') # 拟合数据
    plt.legend()

In [6]:
MD = [] 
DEMAND_SALES = [] 
# fitting sold curve
for i in data['SKU'].unique():
    df = data[data['SKU'] == i]

    # prepare fitting data
    X = np.array(df['MD'])
    y = np.array(df['SALES_QTY'])
    msrp = df['MSRP'].unique()[0]
    md_lower_bound = df['MD_LOWER_BOUND'].unique()[0]
    md_upper_bound = df['MD_UPPER_BOUND'].unique()[0]
    inventory = df['INV'].unique()[0]
    testx = np.arange(0,1,0.05) # test md 0 0.05 0.01 ... 1
    MSE = 1000000 # 设定一个初始MSE
    
    # choose func with min(MSE)
    for func in [linear, exp, log]:
        Params,mse,y_pred = reg(func,X,y) # fit curve
        # choose the one with min(MSE)
        if mse < MSE:
            MSE = mse
            fitParams = Params
            Y_pred = y_pred
            demand = demand_func(msrp,func,fitParams,testx) # calculate demand of each testmd
            sales_amt = func(testx,fitParams[0],fitParams[1]) # calculate sales_qty of each testmd
            FUNC = func
            func_name = func.__name__
            
    md = [i,msrp,md_lower_bound,md_upper_bound,inventory,fitParams[0],fitParams[1],func_name,MSE] 
    MD.append(md)
    
    demand_sales = [i]
    demand_sales.extend(demand.tolist()) # 不同sku在不同md下的demand矩阵
    demand_sales.extend(sales_amt.tolist()) # 不同sku在不同md下的sales_QTY矩阵
    DEMAND_SALES.append(demand_sales) 

# rename MD    
columns_name_MD = ['SKU','MSRP','MD_LOWER_BOUND','MD_UPPER_BOUND','INV','a','b','METHOD','MSE']
MD = pd.DataFrame(MD,columns = columns_name_MD)

# rename DEMAND_SALES
columns_name_SALES = ['SKU']
columns_name_SALES.extend(['MD='+str(round(i,2))+' D' for i in testx])
columns_name_SALES.extend(['MD='+str(round(i,2))+' S' for i in testx])
DEMAND_SALES = pd.DataFrame(DEMAND_SALES,columns=columns_name_SALES)

## Finding Best MD

In [7]:
def sales(x):
    # calculate sales for a MD series, output a list
    sales = [eval(MD['METHOD'][i])(x[i],MD['a'][i],MD['b'][i]) for i in MD.index]
    return sales

def demand(x):
    # calculate demand for a MD series, output a list
    demand = [MD['MSRP'][i] * (1 - x[i]) * (eval(MD['METHOD'][i])(x[i],MD['a'][i],MD['b'][i])) for i in MD.index]
    return demand

def demand_count(x):
    # calculate total demand for a MD series, output a float
    Demand = sum(demand(x))
    return -Demand/10**7 # 取负号: 因为scipy.optimize.minimize求最小值 /10000000: 避免demand数量级太大，optimize跑不出来

def weighted_MD(x):
    # calculate weighted_MD for a MD series, output a float
    weighted_MD = 1- (-demand_count(x)*(10**7)) / (sum(MD['MSRP'] * sales(x)))
    return weighted_MD

def inventory(x):
    # calculate total inventory for a MD series, output a float
    inv = MD['INV'] - sales(x)
    return sum(inv)

### Scenario 1: max **demand**

In [8]:
# define bounds

# MD bounds 
bounds = [[MD['MD_LOWER_BOUND'][i],MD['MD_UPPER_BOUND'][i]] for i in range(len(MD))]

# inventory bounds
inv_cons = []
for i in MD.index:
    def constraint_function(x, i=i):
        return MD['INV'][i] - eval(MD['METHOD'][i])(x[i],MD['a'][i],MD['b'][i])
    inv_cons.append({'fun': constraint_function, 'type': 'ineq'})

In [9]:
x0 = np.array([0]*len(MD))
solution = minimize(demand_count, x0, method='SLSQP', constraints=inv_cons, bounds= bounds)
if solution.success:
    print('max demand is', - solution['fun'] * (10**7))
    # store result
    md_list = [solution['x']]
    result_list = ['MAX_DEMAND']
else:
    # if optimization fails, print its reason
    print(solution.message)

max demand is 2693277.755122124


### Scenario 2: min **Weighted_MD**

In [9]:
x0 = np.array(solution['x'])
solution2 = minimize(weighted_MD, x0, method = 'SLSQP', constraints=inv_cons, bounds = bounds)
if solution2.success:
    print('min weighted_MD is', weighted_MD(solution2['x']))
    # store result
    md_list.append(solution2['x'])
    result_list.append('MIN_W_MD')
else:
    # if optimization fails, print its reason
    print(solution2.message)

min weighted_MD is 0.2523081089908191


### Scenario 3: max **Weighted_MD**

In [10]:
def weighted_MD2(x):
    weighted_MD = 1- (-demand_count(x)*(10**7)) / (sum(MD['MSRP'] * sales(x)))
    return -weighted_MD

solution3 = minimize(weighted_MD2, x0, method = 'SLSQP', constraints=inv_cons, bounds = bounds)
if solution3.success:
    # store result
    print('max weighted_MD is', weighted_MD(solution3['x']))
    md_list.append(solution3['x'])
    result_list.append('MAX_W_MD')
else:
    # if optimization fails, print its reason
    print(solution3.message)

max weighted_MD is 0.3992942657562182


### Scenario 4-6: **Target weighted_MD/demand/inventory**

In [1]:
# setting target weighted md
target_w_MD = 0.3
# setting target weighted md
target_demand = 2.5 * (10**6)
# setting target inventory
target_inventory = 1000

## 注意：设置的target weighted_MD/demand需要在范围内

In [12]:
# define weighted_md constrain
def MD_const(x):
    return target_w_MD - weighted_MD(x)

# define ttl demand constrain
def demand_const(x):
    return -demand_count(x) - target_demand/(10 ** 7)

# define ttl inventory constrain
def inv_const(x):
    return target_inventory - inventory(x)

In [13]:
# target weighted md
cons = [{'fun': MD_const, 'type': 'ineq'}]
cons.extend(inv_cons)

x0 = np.array(solution2['x'])
solution4 = minimize(demand_count, x0, method='SLSQP', constraints=cons, bounds= bounds)
if solution4.success:
    # store result
    md_list.append(solution4['x'])
    result_list.append('TGT_MD')
else:
    # if optimization fails, print its reason
    print(solution4.message)

In [14]:
# target demand
cons = [{'fun': demand_const, 'type': 'ineq'}]
cons.extend(inv_cons)

x0 = np.array(solution2['x'])
solution5 = minimize(weighted_MD, x0, method='SLSQP', constraints=cons, bounds= bounds)
if solution5.success:
    # store result)
    md_list.append(solution5['x'])
    result_list.append('TGT_DEMAND')
else:
    # if optimization fails, print its reason
    print(solution5.message)

In [15]:
# target inventory
cons = [{'fun': inv_const, 'type': 'ineq'}]
cons.extend(inv_cons)

x0 = np.array(solution2['x'])
solution6 = minimize(weighted_MD, x0, method='SLSQP', constraints=cons, bounds= bounds)
if solution6.success:
    # store result
    md_list.append(solution6['x'])
    result_list.append('TGT_INV')
else:
    # if optimization fails, print its reason
    print(solution6.message)

### Scenario 7-10: Dual Objective

In [16]:
from pymoo.optimize import minimize

In [17]:
from pymoo.core.problem import ElementwiseProblem
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.algorithms.moo.rnsga2 import RNSGA2
from pymoo.decomposition.asf import ASF
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.visualization.scatter import Scatter

In [18]:
class MyProblem_wmd_demand(ElementwiseProblem):
    # define problem with 2 objectives: weighted md & demand
    def __init__(self):
        super().__init__(n_var=len(MD),   # 变量数
                         n_obj=2,   # 目标数
                         n_constr= len(MD)+1,    
                         xl=np.array(MD['MD_LOWER_BOUND']),     # MD下界
                         xu=np.array(MD['MD_UPPER_BOUND']),   # MD上界
                         )

    def _evaluate(self, x, out, *args, **kwargs):
        # define objectives
        f1 = weighted_MD(x) 
        f2 = demand_count(x)

        # define constrains
        ## add ttl inventory constrain
        g0 = -inv_const(x)
        G = [g0]
        ## add inventory bounds
        for i in MD.index:
            gi = eval(MD['METHOD'][i])(x[i],MD['a'][i],MD['b'][i]) - MD['INV'][i]
            G.append(gi)

        # todo
        out["F"] = [f1, f2]
        out['G'] = G

class MyProblem_demand_inv(ElementwiseProblem):
    # define problem with 2 objectives: demand & inventory
    def __init__(self):
        super().__init__(n_var=len(MD),   # 变量数
                         n_obj=2,   # 目标数
                         n_constr= len(MD)+1,    
                         xl=np.array(MD['MD_LOWER_BOUND']),     # MD下界
                         xu=np.array(MD['MD_UPPER_BOUND']),   # MD上界
                         )

    def _evaluate(self, x, out, *args, **kwargs):
        # define objectives
        f1 = demand_count(x) 
        f2 = inventory(x)

        # define constrains
        ## add weighted constrain
        g0 = -MD_const(x)
        G = [g0]
        ## add inventory bounds
        for i in MD.index:
            gi = eval(MD['METHOD'][i])(x[i],MD['a'][i],MD['b'][i]) - MD['INV'][i]
            G.append(gi)
        # todo
        out["F"] = [f1, f2]
        out['G'] = G

class MyProblem_wmd_inv(ElementwiseProblem):
    # define problem with 2 objectives: weighted md & inventory
    def __init__(self):
        super().__init__(n_var=len(MD),   # 变量数
                         n_obj=2,   # 目标数
                         n_constr= len(MD) + 1,    
                         xl=np.array(MD['MD_LOWER_BOUND']),     # MD下界
                         xu=np.array(MD['MD_UPPER_BOUND']),   # MD上界
                         )

    def _evaluate(self, x, out, *args, **kwargs):
        # define objectives
        f1 = weighted_MD(x) 
        f2 = inventory(x)

        # define constrains
        ## add demand constr
        g0 = -demand_const(x)
        G = [g0]
        ## add inventory bounds
        for i in MD.index:
            gi = eval(MD['METHOD'][i])(x[i],MD['a'][i],MD['b'][i]) - MD['INV'][i]
            G.append(gi)

        # todo
        out["F"] = [f1, f2]
        out['G'] = G

class MyProblem_wmd_demand_inv(ElementwiseProblem):
    # define problem with 3 objectives: weighted md & demand & inventory
    def __init__(self):
        super().__init__(n_var=len(MD),   # 变量数
                         n_obj=3,   # 目标数
                         n_constr= len(MD),    
                         xl=np.array(MD['MD_LOWER_BOUND']),     # MD下界
                         xu=np.array(MD['MD_UPPER_BOUND']),   # MD上界
                         )

    def _evaluate(self, x, out, *args, **kwargs):
        # define objectives
        f1 = weighted_MD(x) 
        f2 = demand_count(x)
        f3 = inventory(x)

        # add inventory bounds
        G = []
        for i in MD.index:
            gi = eval(MD['METHOD'][i])(x[i],MD['a'][i],MD['b'][i]) - MD['INV'][i]
            G.append(gi)

        # todo
        out["F"] = [f1, f2, f3]
        out['G'] = G

problem1 = MyProblem_wmd_demand()
problem2 = MyProblem_demand_inv()
problem3 = MyProblem_wmd_inv()
problem4 = MyProblem_wmd_demand_inv()

In [19]:
# Define reference points
ref_points = np.array([[target_w_MD, -target_demand/(10**7)]]) # 可以设定多个

# Get Algorithm
algorithm = RNSGA2(
    ref_points=ref_points,
    pop_size=100,
    epsilon=0.01,
    normalization='front',
    extreme_points_as_reference_points=False,)
    #weights=np.array([0.5, 0.5]))

algorithm2 = NSGA2(
    pop_size=100,
    n_offsprings=10,
    eliminate_duplicates=True
)

In [20]:
# optimize problem with 2 objectives: weighted md & demand
res1 = minimize(problem1, algorithm, save_history=True, termination=('n_gen', 200), seed=1, disp=False, verbose = False)
X1 = res1.X # 自变量取值（pre SKU MD）
F1 = res1.F # 目标函数取值 （f1 f2）

In [21]:
# optimize problem with 2 objectives: demand & inventory
res2 = minimize(problem2, algorithm2, save_history=True, termination=('n_gen', 200), seed=1, disp=False, verbose = False)
X2 = res2.X # 自变量取值（pre SKU MD）
F2 = res2.F # 目标函数取值 （f1 f2）

In [22]:
# optimize problem with 2 objectives: weighted md & inventory
ref_points = np.array([[target_w_MD, target_inventory]]) # set new reference point

res3 = minimize(problem3, algorithm, save_history=True, termination=('n_gen', 200), seed=1, disp=False, verbose = False)
X3 = res3.X # 自变量取值（pre SKU MD）
F3 = res3.F # 目标函数取值 （f1 f2）

In [23]:
# optimize problem with 3 objectives: weighted md & demand & inventory
## create the reference directions to be used for the optimization
ref_dirs = get_reference_directions("das-dennis", 3, n_partitions=12)

## create the algorithm object
algorithm3 = NSGA3(pop_size=92,
                  ref_dirs=ref_dirs)

res4 = minimize(problem4, algorithm3, save_history=True, termination=('n_gen', 200), seed=1, disp=False, verbose = False)
X4 = res4.X
F4 = res4.F

In [24]:
def choosing_best_point(F,weights_list):
    # min-max scale
    fl = F.min(axis=0)
    fu = F.max(axis=0)
    nF = (F - fl) / (fu - fl)

    # 定义两个目标的权重
    weights = np.array(weights_list) # 两个目标==重要
    # ASF：选择增广尺度函数(Augmented Scalarization Function, ASF)分解方法
    decomp = ASF()
    i = decomp(nF, weights).argmin() # 选择最小的ASF值

    return i


In [25]:
# print result of problem with 2 objectives: weighted md & demand
point1 = choosing_best_point(F1,[0.5,0.5])
print("problem weighted_md & demand \ni = %s\nF[weighted_md -demand/(10^7)] = %s \n" % (point1, F1[point1]))
md_list.append(X1[point1])
result_list.append('W_MD_DEMAND')

problem weighted_md & demand 
i = 75
F[weighted_md -demand/(10^7)] = [ 0.31850301 -0.25195189] 



In [26]:
# print result of problem with 2 objectives: demand & inventory
point2 = choosing_best_point(F2,[0.5,0.5])
print("problem demand & inventory \ni = %s\nF[weighted_md inventory] = %s" % (point2, F2[point2]))
md_list.append(X2[point2])
result_list.append('DEMAND_INV')

problem demand & inventory 
i = 0
F[weighted_md inventory] = [-2.24847107e-01  1.57899776e+03]


In [27]:
# print result of problem with 2 objectives: weighted md & inventory
point3 = choosing_best_point(F3,[0.5,0.5])
print("problem weighted_md & inventory \ni = %s\nF[weighted_md inventory] = %s" % (point3, F3[point3]))
md_list.append(X3[point3])
result_list.append('W_MD_INV')

problem weighted_md & inventory 
i = 97
F[weighted_md inventory] = [3.21935557e-01 5.56312316e+02]


In [28]:
# print result of problem with 3 objectives: weighted md & demand & inventory
point4 = choosing_best_point(F4,[1/3,1/3,1/3])
print("problem weighted_md & demand & inventory \ni = %s\nF[weighted_md demand inventory] = %s" % (point4, F4[point4]))
md_list.append(X4[point4])
result_list.append('W_MD_DEMAND_INV')

problem weighted_md & demand & inventory 
i = 10
F[weighted_md demand inventory] = [ 2.98753827e-01 -2.35048883e-01  1.25516007e+03]


## Output Result

In [29]:
def optimize_result(x, name):
    # output a result dataframe for a MD series
    optimize_result = pd.DataFrame()
    optimize_result['SKU'] = MD['SKU']
    optimize_result[name+'_MD'] = x
    optimize_result[name+'_SALES'] = sales(x)
    optimize_result[name+'_DEMAND'] = demand(x)
    return optimize_result

In [30]:
# generate SUMMARY sheet
summary = pd.DataFrame(index = result_list)
summary['DEMAND'] = [sum(demand(i)) for i in md_list]
summary['SALES_QTY'] = [sum(sales(i)) for i in md_list]
summary['INVENTORY'] = [inventory(i) for i in md_list]
summary['WEIGHTED_MD'] = [weighted_MD(i) for i in md_list]

# generate RESULT sheet
for i in range(len(md_list)-1):
    MD = pd.merge(MD, optimize_result(md_list[i+1],result_list[i+1]),on='SKU')

In [31]:
# write to excel
with pd.ExcelWriter(path + output_path + 'Pricing_Result.xlsx', engine='xlsxwriter') as writer:
    summary.to_excel(writer, sheet_name = 'SUMMARY', index = True)
    MD.to_excel(writer, sheet_name = 'RESULT', index = False)
    DEMAND_SALES.to_excel(writer, sheet_name= 'DEMAND_SALES',index= False)