In [1]:
# Importing required packages

import pandas as pd
import numpy as np

import gc
import os



In [2]:
def add_gc_count(df): 
    
    refund_trns = ['014', '114']
    ref_trn_count = df.query('terr_pos_trn_typ_cd in @refund_trns')[['pos_busn_dt', 'mcd_gbal_lcat_id_nu', 
                                                                     'pos_ord_nu']].drop_duplicates().shape[0]
    tot_trn_count = df[['pos_busn_dt', 'mcd_gbal_lcat_id_nu', 'pos_ord_nu']].drop_duplicates().shape[0]
    
    return tot_trn_count-2*ref_trn_count


def format_dtype(df, add_monthid=False) :
    
    dtype_dict = {'mcd_gbal_lcat_id_nu':'int',
                 'sld_menu_itm_id':'int',
                 'num_trans':'int',
                 'quantity':'int',
                 'WAP':'float',
                 'Mode_price':'float'}
    df_columns = list(df.columns.values)
    
    if 'pos_busn_dt' in df_columns :
        df['pos_busn_dt'] = pd.to_datetime(df['pos_busn_dt'])
           
    if add_monthid :
        df['monthid'] = df['pos_busn_dt'].dt.strftime('%Y%m')
           
    for ind, val in dtype_dict.items():
        if ind in df_columns :
            df[ind] = df[ind].astype(val)
           
    return df
           


In [3]:
# Getting day level trns

def func(day_level=True, result_filter='digital') :
    
    offline_input_path = '/opt/sasdata/dev/PricingEngines/DiscountEngines/Russia/data/raw/gdw_data/'
    digital_input_path = '/opt/sasdata/dev/PricingEngines/DiscountEngines/Russia/data/raw/digital/trans_ids/'
    digital_day_output_path = '/opt/sasdata/dev/PricingEngines/DiscountEngines/Russia/data/processed/digital/day_level/'

    file_list = [x for x in os.listdir(offline_input_path) if '2020' in x or '2021' in x]
    file_list.sort(reverse=True)

    print(file_list)
    
    grouper_cols_day = ['pos_busn_dt']

    for file in file_list :
        
        # Loading dumped GDW data
        df = pd.read_parquet(offline_input_path + file + '/data.parquet')
        df['sld_menu_itm_id'] = df['sld_menu_itm_id'].astype('int')
        
        # Doing processing to get wap and mode price
        # Separating alc and Combo trns to get mode price
        alc_df = df[(df['cmbo_pren_sld_menu_itm_id'] == -1)].reset_index(drop=True)
        cmbo_df = df[(df['cmbo_pren_sld_menu_itm_id'] != -1)].reset_index(drop=True)
    
        del df
        
        cmbo_pr_df = cmbo_df.groupby(['pos_busn_dt', 
                                      'mcd_gbal_lcat_id_nu', 
                                      'pos_ord_nu', 
                                      'cmbo_pren_sld_menu_itm_id'])[['pos_itm_grss_unt_prc_am']].sum().reset_index()
        
        ran_df = cmbo_df[(cmbo_df['cmbo_pren_sld_menu_itm_id'] ==   cmbo_df['sld_menu_itm_id'])].groupby(['pos_busn_dt', 
                                                                                                 'mcd_gbal_lcat_id_nu', 
                                                                                                 'pos_ord_nu', 
                                                                                                 'cmbo_pren_sld_menu_itm_id'])[['pos_itm_tot_qt']].sum().reset_index()
        
        cmbo_pr_df = cmbo_pr_df.merge(ran_df, on=['pos_busn_dt', 'mcd_gbal_lcat_id_nu', 'pos_ord_nu', 
                                                  'cmbo_pren_sld_menu_itm_id'], how='left')
        cmbo_pr_df['pos_itm_tot_qt'].fillna(1, inplace=True)
        cmbo_pr_df['cmbo_price'] = cmbo_pr_df['pos_itm_grss_unt_prc_am']/cmbo_pr_df['pos_itm_tot_qt']
        
        del cmbo_pr_df['pos_itm_grss_unt_prc_am']
        del cmbo_pr_df['pos_itm_tot_qt']
        
        cmbo_df = cmbo_df.merge(cmbo_pr_df, on=['pos_busn_dt', 'mcd_gbal_lcat_id_nu', 'pos_ord_nu', 
                                                'cmbo_pren_sld_menu_itm_id'], how='left')
        cmbo_df['pos_itm_grss_unt_prc_am'] = cmbo_df['cmbo_price']        
        del cmbo_df['cmbo_price']

        # Cncatenating the dataframe and removing child items for further processing
        df = pd.concat([alc_df, cmbo_df], axis=0).reset_index(drop=True)

        df = df[(df['cmbo_pren_sld_menu_itm_id'] == -1)|
                (df['cmbo_pren_sld_menu_itm_id'] == df['sld_menu_itm_id'])].reset_index(drop=True)
                
        # Loading digital trn data if exist and doing separate operations
        digital_flag = 0
        if os.path.exists(digital_input_path + file + '/'):
            
            # Loading digital data
            dig_df = pd.read_parquet(digital_input_path + file + '/part-0.parquet')[['pos_ord_nu', 'mcd_gbal_lcat_id_nu']]
            dig_df['digital_flag'] = 1
            
            print(dig_df.shape)
            
            # Merging digital data
            df = df.merge(dig_df, on=['mcd_gbal_lcat_id_nu', 'pos_ord_nu'], how='left')
            
            # Clearing space
            del dig_df
            digital_flag = 1
            
        else :
            df['digital_flag'] = np.NaN
        
        
        
        if ('digital' in result_filter or 'all' in result_filter) and digital_flag == 1 :
            
            # Defining variable paths

            # Selecting digital trns and getting WAP and Mode data 
            dig_temp_df = df[(~df['digital_flag'].isna())].reset_index(drop=True)
            dig_temp_df['mult'] = dig_temp_df['pos_itm_grss_unt_prc_am']*dig_temp_df['pos_itm_tot_qt']
                
            
            # Storing day level data if required
            if day_level :
                dig_res_df_day = dig_temp_df.groupby(grouper_cols_day)[['pos_itm_tot_qt']].sum().reset_index()
                dig_res_df_day = dig_res_df_day.merge(dig_temp_df.groupby(grouper_cols_day)[['terr_pos_trn_typ_cd',
                                                                            'pos_busn_dt',
                                                                            'mcd_gbal_lcat_id_nu',
                                                                            'pos_ord_nu']].apply(add_gc_count).reset_index(), 
                                                  on=grouper_cols_day, how='outer')
                
                # Storing result df    
                dig_res_df_day.rename(columns={0:'num_trans', 'pos_itm_tot_qt':'quantity'}, inplace=True)

                # create dir if donesn't exist 
                os.makedirs(digital_day_output_path, exist_ok=True)
                
                dig_res_df_day = format_dtype(dig_res_df_day, add_monthid=True)
                dig_res_df_day.to_parquet(digital_day_output_path, partition_cols=['monthid'] )
                
                del dig_res_df_day
              
            # Clearing space
            del dig_temp_df
            gc.collect()
            
            print(file, 'is done')


In [4]:
func()

['month_id=20210801', 'month_id=20210701', 'month_id=20210601', 'month_id=20210501', 'month_id=20210401', 'month_id=20210301', 'month_id=20210201', 'month_id=20210101', 'month_id=20201201', 'month_id=20201101', 'month_id=20201001', 'month_id=20200901', 'month_id=20200801', 'month_id=20200701', 'month_id=20200601', 'month_id=20200501', 'month_id=20200401', 'month_id=20200301', 'month_id=20200201', 'month_id=20200101']
(7420488, 3)
month_id=20210801 is done
(6341503, 3)
month_id=20210701 is done
(5540225, 3)
month_id=20210601 is done
(6404515, 3)
month_id=20210501 is done
(6572034, 3)
month_id=20210401 is done
(6548645, 3)
month_id=20210301 is done
(6087129, 3)
month_id=20210201 is done
(2634506, 3)
month_id=20210101 is done
(2564914, 3)
month_id=20201201 is done
(1808783, 3)
month_id=20201101 is done
(1707543, 3)
month_id=20201001 is done
(1401672, 3)
month_id=20200901 is done
(2058709, 3)
month_id=20200801 is done
(2040842, 3)
month_id=20200701 is done
(316600, 3)
month_id=20200601 is 