In [0]:
import pandas as pd
from collections import defaultdict
import numpy as np
import os



In [0]:
home_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
home_path = "/".join(home_path.split('/')[:-1])

In [0]:
def get_parameter(key, default=""):
    try:
        return dbutils.widgets.get(key)
    except Exception as e:
        print('have no parameter "{}" in dbutils.widgets, return default "{}"'.format(key, default))
        return default

class VersionControl:
    def __init__(self, ws_type=None, version=None, comment=None):
        self.path = '/Workspace' + home_path + '/save_path_log.txt'
        self.ws_type = ws_type
        self.version = version
        self.comment = comment
        
        if self.ws_type:
            self.check_if_version_exists()

    def check_if_version_exists(self, ws_type=None, version=None):
        ws_type = self.ws_type if not ws_type else ws_type
        version = self.version if not version else version
        ws_type_versions = self.find_versions(ws_type)
        if not ws_type_versions:
            print("ws_type has not been used before")
            return False
        else:
            results = [x for x in ws_type_versions if x[0] == version]
            if len(results) > 0:
                print(results)
                return True
            else:
                print("the version has not been saved before")
                return False
    
    def save_version(self, ws_type=None, version=None, force=False):
        """
        force: if true则有一样的版本也存入，继续跑
        """
        self.ws_type = self.ws_type if not ws_type else ws_type
        self.version = self.version if not version else version
        check_status = self.check_if_version_exists()
        if check_status & ~force:
            raise ValueError('version already exists')
        else:
            with open(self.path, 'a', encoding='utf-8') as file:
                print(f'{self.ws_type}, {self.version}, {self.comment}')
                file.write(f'\n{self.ws_type}, {self.version}, {self.comment}')

    @staticmethod
    def get_all_versions():
        with open('/Workspace' + home_path + '/save_path_log.txt', 'r', encoding='utf-8') as file:
            versions = file.readlines()
        return versions
    
    def get_version_dict(self):
        version_info_list = self.get_all_versions()
        version_dict = defaultdict(list)
        for x in version_info_list:
            if x == '\n':
                pass
            else:
                version_dict[x.split(",")[0]].append((x.split(",")[1].strip(), x.split(",")[2].strip()))
        return version_dict
    
    def find_versions(self, ws_type=None):
        self.version_dict = self.get_version_dict()
        if not ws_type:
            return self.version_dict
        else:
            return self.version_dict.get(ws_type)

In [0]:
# finance_ds_inventory_dmt.info_order_database_finance_stw # ABI直接卖给经销商的，含t1,t1.5

In [0]:
# T1.5从百威进货部分的售出STR
# finance_ds_inventory_dmt.finance_poc_datahub_mid_t1ws_wccs_str # t1.5 -> POC/T2 sales_type=1卖给T2，sales_type = 0 卖给POC


# T1/T1.5 直接从百威进货部分的售出STR
# # 纯一批经销商的STR落表，sale_type =1是卖给售点，2是卖给2批，3是1.5批卖给售点的(从T1进货的部分)
# finance_ds_inventory_dmt.finance_poc_datahub_t1ws_wccs_str        # T1 -> 卖给T1.5/T2/POC


# T1卖给T1.5的STW
# finance_ds_inventory_dmt.finance_poc_datahub_mid_ws_wccs_str  # payercode是T1.5的，filter sj_pay_to=T1

# 废弃旧表
# finance_ds_inventory_dmt.finance_poc_datahub_wccs_str    # T1.5 -> poc 只看从ABI进货的

In [0]:
# brand_family_scope_file_from_vivian = "/dbfs/FileStore/tables/srf/fifo_inv_simu/src_data/SAP_sku_CIO_BRANDFAMILY_Mapping_final.xlsx"
inv_data_path = '/dbfs/FileStore/tables/srf/fifo_inv_simu/src_data/进销存数据.xlsx'  # 期初库存
dec_inv_data_path = '/dbfs/FileStore/tables/srf/fifo_inv_simu/src_data/12月进销存数据.xlsx' # 12月的期末库存 字段也用的是

end_inv_data_dict = {
    '202412':dec_inv_data_path
}

In [0]:
def get_ws_name_payercode_mapping():
    mapping = spark.sql('select distinct int(wholesaler_pay_code) as payercode,  wholesaler_name as payername from budtech_brewdat_prod_ods.abi_cloud_wholesaler_ws_wholesaler').toPandas().set_index('payercode')
    return mapping

In [0]:
def get_sku_hl_mapping_table():
    sku_hl_mapping = spark.sql(
    """
    WITH CIO AS (
    SELECT
        DISTINCT cio_id AS cio_code,
        c_product as cio_name,
        regexp_replace(capacity, '([a-zA-Z])', '') / 1000.0 as container_L,
        -- trim(C2.Seg) AS Segment,
        packagesize
    FROM
        bees_products_ods.product_center_cio_sku C1
        LEFT JOIN bees_products_ods.product_center_cio_mapping C2 ON C2.cio_code = C1.cio_id
    WHERE
        deleted = 0
        AND regexp_replace(capacity, '([a-zA-Z])', '') > 0
        AND C2.Seg IS NOT NULL
        AND COALESCE(packagesize, 0) > 0
    )
    SELECT
    DISTINCT SC.sap_sku_code as SKUCode, K1.cio_code
    ,K1.cio_name
    ,K1.container_L * K1.packagesize/100 HL
    FROM
    CIO K1
    JOIN bees_products_ods.product_center_ref_sap_to_cio SC ON SC.cio_sku_code = K1.cio_code
    """
    ).toPandas()
    return sku_hl_mapping

def get_sku_sap_wccs_code_mapping_table():
    wccs_productid_sap_sku_mapping = spark.sql(
    """
    select distinct product_id, cast(sap_skudm as int) as SKUCode from wccs_ods.bas_material
    """).toPandas()
    return wccs_productid_sap_sku_mapping     

In [0]:
def get_used_brand_family():
    """
    used_wccs_sku_list: wccs product_id 列表，四个brand的
    sku_filtered是过滤了四个brand的sku mapping表；
    product_id_mapping是全量的sku hl mapping表
    """
    # sku_brand_family_mapping = pd.read_excel(brand_family_scope_file_from_vivian) # src: Vivian
    sku_brand_family_mapping = spark.sql("select distinct cast(SKU as int) as SKUCode, Brand_Family as `Brand Family`  from bcc_dmotc_ods.str_stw_discount_mapping_sku_cio").toPandas()
    used_brand_family = ['ISP', 'BUD', 'HBI', 'HBO', 'HKOW']
    # select distinct product_id, cast(sap_skudm as int) as SKUCode from wccs_ods.bas_material
    sku_hl_mapping = get_sku_hl_mapping_table()   # from Charlotte code
    product_id_mapping = get_sku_sap_wccs_code_mapping_table()
    used_brand_family_mapping = sku_brand_family_mapping[sku_brand_family_mapping['Brand Family'].isin(used_brand_family)].drop_duplicates(subset=['SKUCode'])[['SKUCode','Brand Family']]    
    # brand family data 匹配product id和百升数
    product_id_mapping = pd.merge(product_id_mapping, sku_hl_mapping, on='SKUCode', how='left')   # Pcode, SKUCode,HL
    used_brand_family_mapping = pd.merge(used_brand_family_mapping, product_id_mapping, on='SKUCode', how='left')
    used_brand_family_mapping['product_id'] = used_brand_family_mapping['product_id'].str.upper()
    used_wccs_sku_list = list(used_brand_family_mapping['product_id'].unique())
    return used_wccs_sku_list, used_brand_family_mapping, product_id_mapping

In [0]:
def get_inv_data(used_wccs_sku_list, sku_filtered, used_qty_col='期初库存', path=inv_data_path):
    """
        箱数_col：期初库存
        hl_col: inv_hl     
    """
    # # begin_inv = pd.read_csv('src_data/库存余额表DB-202212.csv')       # 2022年末的期末库存就是2023年的期初库存
    begin_inv = pd.read_excel(path)
    begin_inv['物料代码'] = begin_inv['物料代码'].str.upper()
    begin_inv = begin_inv[begin_inv['物料代码'].isin(used_wccs_sku_list)]
    sku_filtered_info = sku_filtered.sort_values(by='product_id').drop_duplicates(subset=['product_id'])
    sku_filtered_info = sku_filtered_info[sku_filtered_info['product_id'].notnull()]
    begin_inv = pd.merge(begin_inv, sku_filtered_info[['product_id', 'SKUCode', 'Brand Family', 'HL']], left_on='物料代码',right_on='product_id', how='left')
    unfound_sku = begin_inv[begin_inv['SKUCode'].isnull()]
    if len(unfound_sku) > 0:
        print("没对应上SKUCode/HL的wccs product id", unfound_sku['product_id'].unique())
    begin_inv['inv_hl'] = begin_inv[used_qty_col] * begin_inv['HL']    # 转换成百升
    return begin_inv

In [0]:
def get_ending_inv_month(month='202412', used_qty_col='期末库存'):
    """
    month: 202412的话是指看202412的期末库存
    used_qty_col: 默认用这个数据的期末库存，202501的期初库存=202412的期末库存，理论上
    return: by ws by brand的期末库存
    """
    used_wccs_sku_list, sku_filtered, product_id_mapping = get_used_brand_family() # sku_filtered是过滤了四个brand的sku mapping表；product_id_mapping是全量的sku hl mapping表
    ending_inv = get_inv_data(used_wccs_sku_list, sku_filtered, used_qty_col=used_qty_col, path=end_inv_data_dict[month])

    ws_code_mapping_table = spark.sql("select distinct wholesalerid as ws_code, int(Pay_to) as payercode  from bcc_baseline_ods.wccs_ws").toPandas()  # 一个w对着一个300；一个300对着多个W
    ending_inv2 = pd.merge(ending_inv, ws_code_mapping_table, left_on='经销商代码', right_on='ws_code', how='left')
    ending_inv2['payercode'] = ending_inv2['payercode'].apply(lambda x: None if np.isnan(x) else int(x))
    end_inv_by_ws_by_brand = ending_inv2.groupby(['payercode', 'Brand Family'])['inv_hl'].sum()   # 期初库存by ws by brand
    return end_inv_by_ws_by_brand

In [0]:
def get_stw_data(start=202101, end=202410):
    """
        箱数_col：stw_qty
        hl_col: stw_hl
        month_col: billingyearmonth
    """
    # # stw = pd.read_excel('src_data/stw_str_241118.xlsx', sheet_name='STW')        # 2023.1.1~2023.12.31
    stw = spark.sql('select * from finance_ds_inventory_dmt.info_order_database_finance_stw').toPandas()          # 2021.1~2024.10
    stw.columns = [x.lower() for x in stw.columns]
    stw = stw[(stw['billingyearmonth'] >= start) & (stw['billingyearmonth'] <= end)]
    return stw

def get_str_data(sku_filtered, str_db_path, start=202101, end=202410):
    """
        sku_filtered：skucode and hl, for hl mapping
        str_db_path: 如果是字符串直接读表，或者直接传入聚合好的数
        month_col: month
    """
    # # ws_str = pd.read_excel('src_data/stw_str_241118.xlsx', sheet_name='STR')
    if isinstance(str_db_path, str):
        ws_str = spark.sql(f"select * from {str_db_path}").toPandas()
    elif isinstance(str_db_path, pd.DataFrame):
        ws_str = str_db_path
    else:
        ws_str = str_db_path.toPandas()
    # ws_str = spark.sql('select * from finance_ds_inventory_dmt.finance_poc_datahub_wccs_str').toPandas()       # 2021.1~2024.11
    ws_str['payercode'] = ws_str['payercode'].astype(int)
    ws_str = ws_str[(ws_str['month'] >= start) & (ws_str['month'] <= end)]
    ws_str.columns = [x.lower() for x in ws_str.columns]
    sku_filtered_info = sku_filtered.sort_values(by='SKUCode').drop_duplicates(subset=['SKUCode'])
    ws_str = pd.merge(ws_str, sku_filtered_info[['SKUCode', 'HL']], left_on='skucode', right_on='SKUCode', how='left')
    unfound_sku = ws_str[ws_str['HL'].isnull()]
    if len(unfound_sku) > 0:
        print(unfound_sku)
    ws_str['str_hl'] = ws_str['str_qty'] * ws_str['HL']
    return ws_str

In [0]:
def cat_code(df):
    return str(df['t1_code']) + "_" + str(df['t15_code'])

def split_paycode(df):
    v = df['payercode'].split("_")
    return v[0], v[1]

In [0]:
def get_all_t1_t15_comb():
    """
    所有t1 * t15 的组合
    """
    df = spark.sql(f'''
        select int(payercode) as t15_code, int(sj_pay_to) as t1_code  
        from finance_ds_inventory_dmt.finance_poc_datahub_mid_ws_wccs_str 
        where deleted = 0  
        group by all 
        ''')
    print(df.count()) # 1878
    df = df.toPandas()
    df = df[df['t15_code'].notnull()]
    df = df[df['t1_code'].notnull()]
    df['t15_code'] = df['t15_code'].apply(lambda x: str(int(x)))
    df['t1_code'] = df['t1_code'].apply(lambda x: str(int(x)))

    return df


def get_t15_stw_monthly_from_t1(sku_filtered, start='202101', end='202410'):
    """
    t15 每个月从t1 的进货量

    STR T1 to T1.5
    payercode =T1.5 A, sj_pay_to = T1
    payercode =T1.5 B, sj_pay_to = T1
    """
    print(start, end)
    df = spark.sql(f'''
        select  payercode as t15_code, sj_pay_to as t1_code, Brand_Family_ComDB, wccs_sku_name, skucode, month as billingyearmonth, sum(str_qty) as str_qty 
        , payercode, payer_bu_name, payer_region_name
        from finance_ds_inventory_dmt.finance_poc_datahub_mid_ws_wccs_str 
        where deleted = 0 
        group by all  
        ''')
    stw = df.toPandas()
    #TODO payercode t1,t15 组合？
    stw = stw[stw['t1_code'].notnull()]
    stw = stw[stw['t15_code'].notnull()]
    stw['t1_code'] = stw['t1_code'].apply(lambda x: str(int(x)))
    stw['t15_code'] = stw['t15_code'].apply(lambda x: str(int(x)))
    stw = stw[(stw['billingyearmonth'] >= start) & (stw['billingyearmonth'] <= end)]
    stw.columns = [x.lower() for x in stw.columns]
    sku_filtered_info = sku_filtered.sort_values(by='SKUCode').drop_duplicates(subset=['SKUCode'])
    stw = pd.merge(stw, sku_filtered_info[['SKUCode', 'HL']], left_on='skucode', right_on='SKUCode', how='left')
    unfound_sku = stw[stw['HL'].isnull()]
    if len(unfound_sku) > 0:
        print(unfound_sku)
    stw['stw_hl'] = stw['str_qty'] * stw['HL']
    stw['payercode']  = stw.apply(lambda x : cat_code(x), axis=1)
    return stw

1878


In [0]:
def get_t15_str_monthly(sku_filtered, start='202101', end='202410'):
    """
    t15 的str量
    记到不同的t1身上

    T1.5 to POC
    sale type = 3, sub_payercode = T1.5 A
    sale type = 3, sub_payercode = T1.5 B

    """
    print(start, end)
    df = spark.sql(f'''
        select payercode as t1_code, sub_payercode as t15_code,  Brand_Family_ComDB, wccs_sku_name, skucode,  month, sum(str_qty) as str_qty , 
        sub_payercode as payercode, payer_bu_name, payer_region_name
        from finance_ds_inventory_dmt.finance_poc_datahub_t1ws_wccs_str 
        where deleted = 0
        and sale_type = 3
        group by all  
        ''')
    ws_str = df.toPandas()
    ws_str = ws_str[ws_str['t1_code'].notnull()]
    ws_str = ws_str[ws_str['t15_code'].notnull()]
    ws_str['t1_code'] = ws_str['t1_code'].apply(lambda x: str(int(x)))
    ws_str['t15_code'] = ws_str['t15_code'].apply(lambda x: str(int(x)))
    ws_str = ws_str[(ws_str['month'] >= start) & (ws_str['month'] <= end)]
    ws_str.columns = [x.lower() for x in ws_str.columns]
    sku_filtered_info = sku_filtered.sort_values(by='SKUCode').drop_duplicates(subset=['SKUCode'])
    ws_str = pd.merge(ws_str, sku_filtered_info[['SKUCode', 'HL']], left_on='skucode', right_on='SKUCode', how='left')
    unfound_sku = ws_str[ws_str['HL'].isnull()]
    if len(unfound_sku) > 0:
        print(unfound_sku)
    ws_str['str_hl'] = ws_str['str_qty'] * ws_str['HL']
    ws_str['payercode']  = ws_str.apply(lambda x : cat_code(x), axis=1)
    return ws_str

In [0]:
# df = spark.sql(f'''
#     select  payercode as t15_code, sj_pay_to as t1_code, Brand_Family_ComDB, wccs_sku_name, skucode, month as billingyearmonth, sum(str_qty) as str_qty 
#     , payercode, payer_bu_name, payer_region_name
#     from finance_ds_inventory_dmt.finance_poc_datahub_mid_ws_wccs_str 
#     where deleted = 0 
#     group by all  
#     ''')
# df.filter('(t1_code == 30012616) and (t15_code == 30020398) and (Brand_Family_ComDB== "BUD")').display()



t15_code,t1_code,Brand_Family_ComDB,wccs_sku_name,skucode,billingyearmonth,str_qty,payercode,payer_bu_name,payer_region_name
30020398,30012616,BUD,百威金尊9.5度428ML1X12纸箱回瓶装-尊享版,99717,202309,52,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,CT_百威9.7度460ML1X12纸箱回瓶,30882,202312,965,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,百威9.7度460ML1X6纸箱OW瓶装-零售价版,96141,202401,44,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,百威金尊9.5度428ML1X12纸箱回瓶装-尊享版,99717,202310,262,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,CT_百威9.7度460ML1X12纸箱回瓶,30882,202402,365,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,百威9.7度460ML1X6纸箱OW瓶装-零售价版,96141,202309,692,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,百威金尊9.5度428ML1X12纸箱回瓶装-尊享版,99717,202312,54,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,CT_百威9.7度460ML1X12纸箱回瓶,30882,202307,904,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,百威9.7度460ML1X6纸箱OW瓶装-河北北,92547,202307,1512,30020398,北区|BU_N,河北北区|Region_Hebei_North
30020398,30012616,BUD,百威金尊9.5度428ML1X12纸箱回瓶装-尊享版,99717,202402,42,30020398,北区|BU_N,河北北区|Region_Hebei_North


In [0]:
def get_t1_t15_brand_mapping(level='brand'):
    """
    level 还可以是sku
    """
    # -- t1.5手里某个sku的量来自多少个t1
    # with table as (
    # select payercode, sj_pay_to, Brand_Family_ComDB, skucode from (
    # select int(payercode), int(sj_pay_to), Brand_Family_ComDB, int(skucode), row_number() over (partition by payercode, sj_pay_to, Brand_Family_ComDB, skucode order by month) as rank
    # from finance_ds_inventory_dmt.finance_poc_datahub_mid_ws_wccs_str ) where rank = 1)

    # select payercode as t15_code, skucode, count(sj_pay_to) as t1_count from table
    # group by payercode, skucode
    # order by sum(sj_pay_to) desc
    if level == 'brand':
        t1_t15_brand_mapping = spark.sql("""
        select payercode as t15_code, int(sj_pay_to) as t1_code, Brand_Family_ComDB from (
        select int(payercode), int(sj_pay_to), Brand_Family_ComDB, row_number() over (partition by payercode, sj_pay_to, Brand_Family_ComDB order by month) as rank
        from finance_ds_inventory_dmt.finance_poc_datahub_mid_ws_wccs_str where deleted = 0  ) where rank = 1
        """).toPandas()
    else:
        t1_t15_brand_mapping = spark.sql("""
        select payercode as t15_code, int(sj_pay_to) as t1_code, Brand_Family_ComDB, skucode from (
        select int(payercode), int(sj_pay_to), Brand_Family_ComDB, skucode, row_number() over (partition by payercode, sj_pay_to, Brand_Family_ComDB, skucode order by month) as rank
        from finance_ds_inventory_dmt.finance_poc_datahub_mid_ws_wccs_str where deleted = 0) where rank = 1
        """).toPandas()
        t1_t15_brand_mapping['Brand_Family_ComDB'] = t1_t15_brand_mapping.apply(lambda x: "_".join([x['Brand_Family_ComDB'], str(x['skucode'])]), axis=1)  
    t1_t15_brand_mapping = t1_t15_brand_mapping[t1_t15_brand_mapping['t15_code'].notnull()]
    t1_t15_brand_mapping = t1_t15_brand_mapping[t1_t15_brand_mapping['t1_code'].notnull()]
    t1_t15_brand_mapping['t1_code'] = t1_t15_brand_mapping['t1_code'].astype(int)
    t1_t15_brand_mapping['t15_code'] = t1_t15_brand_mapping['t15_code'].astype(int)
    return t1_t15_brand_mapping

In [0]:
# from pyspark.sql.functions import col

In [0]:
# result_df = spark.read.parquet('/mnt/srf/inv/fifo_inventory_details_month12_t1_t15_v20250117')
# t15 = result_df.filter(col('Openning Balance').isNotNull()).toPandas()
# t15['Openning Balance'].sum()

272884.24105326

In [0]:
# begin_inv2[begin_inv2['payercode'].isin(t15['t15_code'].unique())]['inv_hl'].sum()

568344.5007492801