In [195]:
import numpy as np
import pandas as pd
np.set_printoptions(suppress=True, precision=4, linewidth=150)
pd.options.display.float_format = '{:,.4f}'.format
pd.options.display.max_colwidth = 100

import duckdb
import openpyxl as op

from sklearn import linear_model
from sklearn.metrics import mean_squared_error, r2_score

import matplotlib as mpl
import matplotlib.pyplot as plt
plt.style.use('seaborn-v0_8-deep')

import os

In [196]:
# DB connection
db_file = r'd:\duckdb\retail_sales.duckdb'
conx = duckdb.connect(database=db_file, read_only=True)

# Inputs
chain_name = 'chain_name'
sku_code = 'sku_code'

week_start = '2024-04-01'
weeks_num = 8
bin_rng = 5

In [197]:
# SKU name
sku_name = conx.execute('select global_sku_name from base_sku where global_sku_code = ?', [sku_code]).fetchone()[0]
weeks_range = [d.date() for d in pd.date_range(start=week_start, periods=weeks_num, freq='W-MON').to_list()]

In [198]:
# SQL queries
with (open('queries\\pos_list.sql', encoding='utf-8') as fn1, open('queries\\sales_data.sql', encoding='utf-8') as fn2):
    sql_pos_list = fn1.read()
    sql_sales_data = fn2.read()

In [199]:
# Limit PoS in the sample to whose where sales were present in 25% of weeks in the period under consideration
pos_list = conx.execute(sql_pos_list, [chain_name, sku_code, weeks_range[0], weeks_range[-1]]).fetch_df()
weeks_limit = np.floor(0.25*len(weeks_range)).astype(np.int64)
pos_list.loc[pos_list['wks'] > weeks_limit, 'filter'] = 1
pos_list.loc[pos_list['wks'] <= weeks_limit, 'filter'] = 0

pos_cleaned = pos_list[pos_list['filter'] == 1].loc[:, 'global_pos_code'].tolist()

sales_data = conx.execute(sql_sales_data, [chain_name, sku_code, weeks_range[0], weeks_range[-1]]).fetch_df()
sales_data['price_bin_range'] = sales_data.apply(lambda x: str(int(x['price_bin'])) + '-' + str(int(x['price_bin']) + bin_rng), axis=1)
sales_data['price_kg'] = sales_data['price_kg'].round(2)
sales_data.loc[sales_data['global_pos_code'].isin(pos_cleaned), 'filter'] = 1
sales_data.loc[~sales_data['global_pos_code'].isin(pos_cleaned), 'filter'] = 0

In [200]:
conx.close()

In [201]:
# Aggregate sales across weeks and price bins
sales_data_agg_1 = sales_data.loc[sales_data['filter'] == 1, sales_data.columns[0:-1]]
sales_data_agg_1 = (sales_data_agg_1
                      .groupby(['wk_num', 'week_start_date', 'price_kg', 'price_bin', 'price_bin_range'], as_index=False)
                      .agg(
                          turnover=pd.NamedAgg(column='kg', aggfunc='mean'),
                          kg=pd.NamedAgg(column='kg', aggfunc='sum'),
                          akb=pd.NamedAgg(column='global_pos_code', aggfunc='count')
                        )
                  )
sales_data_agg_1['price_kg'] = sales_data_agg_1['price_kg'].round(2)

# Aggregate sales across price bins
sales_data_agg_2 = (sales_data_agg_1
              .groupby(['price_bin', 'price_bin_range'], as_index=False)
              .agg(
                  kg=pd.NamedAgg(column='kg', aggfunc='sum'), 
                  akb=pd.NamedAgg(column='akb', aggfunc='sum')
                )
            )

# Limit price levels to whose which cover 1% of PoS/week combinations
pos_limit = np.round(0.01*len(pos_list)*len(weeks_range)).astype(np.int64)
sales_data_agg_2['turnover'] = sales_data_agg_2['kg'] / sales_data_agg_2['akb']
sales_data_agg_2['turnover'] = sales_data_agg_2['turnover'].round(4)
sales_data_agg_2.loc[sales_data_agg_2['akb'] >= pos_limit, 'filter'] = 1
sales_data_agg_2.loc[sales_data_agg_2['akb'] < pos_limit, 'filter'] = 0

price_data = sales_data_agg_2.loc[sales_data_agg_2['filter'] == 1, sales_data_agg_2.columns[0:-1]]
price_data['price_bin'] = price_data['price_bin'] + 0.5*bin_rng

In [202]:
# Estimate PED
X = np.log(price_data[['price_bin']])
y = np.log(price_data['turnover'])
w = price_data['akb']

ols_reg = linear_model.LinearRegression()
ols_reg.fit(X, y, sample_weight=w)
y_pred = ols_reg.predict(X)

res_dict = {
        'SKU': sku_name + ' (' + sku_code + ')',
        'Сеть': chain_name,
        'Даты': f'{weeks_range[0].strftime("%Y.%m.%d")}-{weeks_range[-1].strftime("%Y.%m.%d")}',
        'Недель': f'{len(weeks_range):,d}',
        'Включенные PoS': f'{len(sales_data.loc[sales_data["filter"] == 1, "global_pos_code"].unique()):,d} / {len(pos_list):,d}',
        'Включенные PoS/Wks': f'{len(sales_data.loc[sales_data["filter"] == 1, :]):,d} / {len(pos_cleaned)*len(weeks_range):,d} / {len(pos_list)*len(weeks_range):,d}',
        'Учтенные кг': f'{price_data["kg"].sum():,.0f} / {pos_list["kg"].sum():,.0f}',
        'ln(Q)=': f'{ols_reg.coef_[0]:+.4f}*ln(x) {ols_reg.intercept_:+.4f}',
        'Q=': f'exp({ols_reg.coef_[0]:+.4f}*ln(x) {ols_reg.intercept_:+.4f})',
        'PED': np.round(ols_reg.coef_[0], 4),
        'alpha': np.round(ols_reg.intercept_, 4),
        'R2 (unlogged)': np.round(r2_score(np.exp(y), np.exp(y_pred), sample_weight=w), 4),
        'RMSE (unlogged)': np.round(np.sqrt(mean_squared_error(np.exp(y), np.exp(y_pred), sample_weight=w)), 4)
        }

price_data = pd.concat([price_data, pd.Series(np.round(np.exp(y_pred), 4), index=price_data.index, name='turnover_pred')], axis=1)

In [203]:
# Save results to excel workbook
xls_name = f'reports\\{res_dict["SKU"] + " " + res_dict["Сеть"] + " " + res_dict["Даты"]}.xlsx'
writer = pd.ExcelWriter(xls_name)

pos_list.to_excel(writer, sheet_name='pos_list', index=False)
sales_data.to_excel(writer, sheet_name='sales_data', index=False)
sales_data_agg_1.to_excel(writer, sheet_name='sales_data_agg_1', index=False)
sales_data_agg_2.to_excel(writer, sheet_name='sales_data_agg_2', index=False)
price_data.to_excel(writer, sheet_name='price_data', index=False)
pd.Series(res_dict, name='Показатель').to_excel(writer, sheet_name='results', index=True)

writer.close()

In [204]:
# Prepare a graph in an unlogged space
fig, ax = plt.subplots(figsize=(12, 7), dpi=72, layout='constrained')

ax.scatter(price_data['price_bin'], price_data['turnover'])
ax.plot(price_data['price_bin'], price_data['turnover_pred'], color='#595959', linestyle='dashed')

ax.annotate(f'ln(Q) = {res_dict["PED"]:+.2f}*ln(x){res_dict["alpha"]:+.2f}\nR2 = {res_dict["R2 (unlogged)"]:.2f}',
    xy=(ax.get_xlim()[0], ax.get_ylim()[0]), 
    xytext=(0.80, 0.91),
    textcoords='axes fraction',
    fontsize=10,
    horizontalalignment='left',
    linespacing=1.5
)
ax.set_title(f'{res_dict["SKU"] + " / " + res_dict["Сеть"] + " / " + res_dict["Даты"]}')
ax.set_xlabel('Цена, руб/кг')
ax.set_ylabel('Оборачиваемость в активную неделю')

plt.savefig('output.png', bbox_inches='tight', pad_inches=0.15)
plt.close()

In [205]:
# Excel formatting
wb = op.load_workbook(filename=xls_name)

col_widths = {
    'pos_list': [16, 16, 10, 20, 10, 10, 10],
    'sales_data': [10, 20, 16, 16, 10, 10, 10, 10, 16, 10],
    'sales_data_agg_1': [10, 20, 10, 10, 16, 10, 10, 10],
    'sales_data_agg_2': [10, 16, 10, 10, 10, 10],
    'price_data': [10, 16, 10, 10, 10, 16],
    'results': [20, 80]
}

side = op.styles.Side(border_style=None)
no_border = op.styles.borders.Border(left=side, right=side, top=side, bottom=side)
font_txt = op.styles.Font(name='Arial', size=10)
font_head = op.styles.Font(name='Arial', size=10, bold=True)

for ws in wb:
    for col in ws.iter_cols():
        ws.column_dimensions[op.utils.get_column_letter(col[0].column)].width = col_widths[ws.title][col[0].column - 1]
        for cell in col:
            cell.border = no_border
            if cell.row == 1:
                cell.font = font_head
            else:
                cell.font = font_txt

wb.active = wb['results']
ws.add_image(op.drawing.image.Image('output.png'), 'D2')

wb.save(filename=xls_name)
os.remove('output.png')