In [2]:
import pandas as pd
import os
import sys

project_path = os.path.abspath(os.path.join('..'))

if project_path not in sys.path:
    sys.path.append(project_path)

## Read Data

In [20]:
price = pd.read_csv('../data/DPS_PRICE.csv')
promo = pd.read_csv('../data/DPS_PROMO.csv')
price.sample()

Unnamed: 0,PRODUCT_ID,LOCATION_ID,CUSTOMER_ID,DISTR_CHANNEL_ID,PERIOD_START_DT,PERIOD_END_DT,PRICE,PRICE_TYPE,MODIFIED_DTTM,DELETE_FLG
935,80005,600011,,,24-May-20,18-Aug-20,10.01,regular,01JUL2021:14:31:07,0


In [18]:
data_quality_output = pd.DataFrame()

## Check quality of data

### Check values range

In [36]:
def check_val_range(data_quality_output, table, target_col, th=0):
    result = table[table[target_col] < th]
    if not result.empty:
        data_quality_output = pd.concat([data_quality_output, result])
    
    return data_quality_output

In [25]:
data_quality_output = check_val_range(data_quality_output, promo, 'PROMO_PRICE')

Unnamed: 0,PRODUCT_ID,LOCATION_ID,CUSTOMER_ID,DISTR_CHANNEL_ID,PERIOD_START_DT,PERIOD_END_DT,PRICE,PRICE_TYPE,MODIFIED_DTTM,DELETE_FLG,PROMO_ID,PROMO_PRICE,PROMO_TYPE


### Check cross consistency

In [33]:
sales = pd.read_csv('../data/DPS_SELL_OUT.csv')
sales.sample()

Unnamed: 0,PRODUCT_ID,LOCATION_ID,CUSTOMER_ID,DISTR_CHANNEL_ID,PERIOD_DT,ORDERS_QTY,ORDERS_AMOUNT,SHIPMENTS_QTY,SHIPMENTS_AMOUNT,INVOICES_QTY,INVOICES_AMOUNT,RETURNS_QTY,RETUNRS_AMOUNT,PROMO_FLG,PROMO_ID,COST,MODIFIED_DTTM,DELETE_FLG
38336,80071,600010,6000007,1,8-Mar-20,8926,195300.88,8838,193375.44,8838,193375.44,0,0,0,0,14.24,01JUL2021:14:31:06,0


In [72]:
def check_cross_consistency(data_quality_output, df1, df2):
    common_cols = df1.columns.intersection(df2.columns)
    common_cols = list(common_cols[common_cols.str.contains('ID')])
    
    if common_cols == []:
        return data_quality_output

    df_merged = df1.drop_duplicates(common_cols).merge(df2.drop_duplicates(common_cols), on=common_cols, 
                       how='left', indicator=True)
    
    result = df_merged[df_merged['_merge'] == 'left_only']
    
    if not result.empty:
        data_quality_output = pd.concat([data_quality_output, result])
        
    return data_quality_output

In [73]:
check_cross_consistency(data_quality_output, df1, df2)

Unnamed: 0,PRODUCT_ID,LOCATION_ID,CUSTOMER_ID,DISTR_CHANNEL_ID,PERIOD_START_DT,PERIOD_END_DT,PRICE,PRICE_TYPE,MODIFIED_DTTM,DELETE_FLG,...,INVOICES_QTY,INVOICES_AMOUNT,RETURNS_QTY,RETUNRS_AMOUNT,PROMO_FLG,PROMO_ID,COST,MODIFIED_DTTM_y,DELETE_FLG_y,_merge
0,80001,600002,,,12-Jan-20,7-Apr-20,19.68,regular,,,...,,,,,,,,,,left_only
1,80001,600004,,,12-Jan-20,7-Apr-20,20.85,regular,,,...,,,,,,,,,,left_only
2,80001,600009,,,12-Jan-20,7-Apr-20,21.96,regular,,,...,,,,,,,,,,left_only
3,80001,600010,,,12-Jan-20,7-Apr-20,22.13,regular,,,...,,,,,,,,,,left_only
4,80001,600011,,,12-Jan-20,7-Apr-20,21.13,regular,,,...,,,,,,,,,,left_only
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
390,80083,600005,,,31-May-20,25-Aug-20,9.69,regular,,,...,,,,,,,,,,left_only
391,80083,600009,,,31-May-20,25-Aug-20,10.20,regular,,,...,,,,,,,,,,left_only
392,80083,600010,,,31-May-20,25-Aug-20,9.69,regular,,,...,,,,,,,,,,left_only
393,80083,600011,,,31-May-20,25-Aug-20,10.21,regular,,,...,,,,,,,,,,left_only


### Check time cross consistency

In [81]:
stock = pd.read_csv('../data/DPS_STOCK.csv')
stock.sample()

Unnamed: 0,PRODUCT_ID,LOCATION_ID,PERIOD_DT,STOCK_QTY,MODIFIED_DTTM,DELETE_FLG
36,80011,600005,28-Feb-21,25766,01JUL2021:14:31:06,0


In [80]:
sales.sample()

Unnamed: 0,PRODUCT_ID,LOCATION_ID,CUSTOMER_ID,DISTR_CHANNEL_ID,PERIOD_DT,ORDERS_QTY,ORDERS_AMOUNT,SHIPMENTS_QTY,SHIPMENTS_AMOUNT,INVOICES_QTY,INVOICES_AMOUNT,RETURNS_QTY,RETUNRS_AMOUNT,PROMO_FLG,PROMO_ID,COST,MODIFIED_DTTM,DELETE_FLG
30048,80057,600012,6000014,1,27-Sep-20,1989,30829.5,1969,30519.5,1969,30519.5,0,0,0,0,10.69,01JUL2021:14:31:06,0


In [79]:
assort_matrix = pd.read_csv('../data/DPS_ASSORT_MATRIX.csv')
assort_matrix.sample()

Unnamed: 0,PRODUCT_ID,LOCATION_ID,CUSTOMER_ID,DISTR_CHANNEL_ID,START_DT,END_DT,STATUS,MODIFIED_DTTM,DELETE_FLG
12,80002,600010,6000007,1,17-Nov-19,,active,01JUL2021:14:31:07,0


In [None]:
def check_time_cross_consistency(data_quality_output, df1, df2):
    common_cols = df1.columns.intersection(df2.columns)
    common_id_cols = common_cols[common_cols.str.contains('ID')]
    common_id_cols = list(common_id_cols[(common_id_cols.str.contains('PRODUCT')) | (common_id_cols.str.contains('LOCATION'))])

    common_dt_cols = list(common_cols[common_cols.str.endswith('_DT')])
    
    if common_dt_cols == [] or common_id_cols == []:
        return data_quality_output
    
    common_cols = common_id_cols + common_dt_cols


    df_merged = df1.drop_duplicates(common_cols).merge(df2.drop_duplicates(common_cols), on=common_cols, 
                       how='left', indicator=True)

    result1 = df_merged[df_merged['_merge'] == 'left_only'][cols]

    if not result1.empty:
        data_quality_output = pd.concat([data_quality_output, result1])

    both = df_merged[df_merged['_merge'] == 'both']
    both = both.groupby(common_cols).size().reset_index(name='cnt')
    result2 = both[both['cnt'] <= th].drop('cnt', axis=1)

    if not result2.empty:
        data_quality_output = pd.concat([data_quality_output, result2])
    
    return data_quality_output


In [324]:
check_time_cross_consistency(data_quality_output, stock, assort_matrix)

Unnamed: 0,PRODUCT_ID,LOCATION_ID,CUSTOMER_ID,DISTR_CHANNEL_ID,WARNING_TYPE,WARNING,PERIOD_DT
0,80001,600002,6000015.0,1.0,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,
1,80001,600002,6000018.0,1.0,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,
2,80001,600004,6000019.0,1.0,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,
3,80001,600009,6000020.0,1.0,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,
4,80001,600010,6000007.0,1.0,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,
...,...,...,...,...,...,...,...
269,80083,600005,,,time_cross_consistency,id rows from table DPS_STOCK doesnot appear in...,28-Feb-21
270,80083,600009,,,time_cross_consistency,id rows from table DPS_STOCK doesnot appear in...,28-Feb-21
271,80083,600010,,,time_cross_consistency,id rows from table DPS_STOCK doesnot appear in...,28-Feb-21
272,80083,600011,,,time_cross_consistency,id rows from table DPS_STOCK doesnot appear in...,28-Feb-21


In [320]:
class DQ:
    def __init__(self, check_id,
                 check_name, client,
                 input_tables, th_values,
                 lvl_data, data_path
                ):
        self.check_id = check_id
        self.check_name = check_name
        self.client = client
        self.input_tables = input_tables
        self.th_values = th_values
        self.lvl_data = lvl_data
        self.data_path = data_path
        self.data_quality_output = pd.DataFrame()
        

    def check_val_range(self, tables, th=0):
        """
        Сhecks that the column values are not greater than the specified value 
        (for example, that the prices are not negative).
        
        Parameters
        ----------
        tables : list of tuples
            List of checking tables and checking columns [(table1, col1), (table2, col2)]
        
        th : float
            Rows, where target_col less than th add to data_quality_output
            

        Returns
        -------
        pd.DataFrame
            Add rows to data_quality_output table
        """
        
        
        for table_name, target_col in tables:            
            table = pd.read_csv(self.data_path + table_name + '.csv')
            
            result = table[table[target_col] < th]
            
            if not result.empty:
                result['INPUT_COLUMN'] = target_col
                result['INPUT_TABLE'] = table_name
                result['INPUT_VALUE'] = th
                result['WARNING_TYPE'] = 'val_range'
                result['WARNING'] = f'values in column {target_col} are less than {th} in table {table_name}'
                self.data_quality_output = pd.concat([self.data_quality_output, result])

        return self.data_quality_output
    
    
    def check_cross_consistency(self, tables):
        """
        Checks that there are no key fields in the first table that are missing in the second table.
        Checking each pair of tables.
        
        Parameters
        ----------
        tables : list
            List of checking tables [table1, table2, table3]
            

        Returns
        -------
        pd.DataFrame
            Add rows to data_quality_output table
        """

        for df1_name, df2_name in list(itertools.permutations(tables, 2)):
            df1, df2 = pd.read_csv(self.data_path + df1_name + '.csv'), pd.read_csv(self.data_path + df2_name + '.csv')
            common_cols = df1.columns.intersection(df2.columns)
            common_cols = list(common_cols[common_cols.str.contains('ID')])

            if common_cols == []:
                break

            df_merged = df1.drop_duplicates(common_cols).merge(df2.drop_duplicates(common_cols), on=common_cols, 
                               how='left', indicator=True)

            result = df_merged[df_merged['_merge'] == 'left_only'][common_cols]

            if not result.empty:
                result['INPUT_TABLE'] = df1_name + ' && ' + df2_name
                result['WARNING_TYPE'] = 'cross_consistency'
                result['WARNING'] = f'id rows from table {df1_name} doesnot appear in table {df2_name}'
                self.data_quality_output = pd.concat([self.data_quality_output, result])

        return self.data_quality_output
    
    def check_time_cross_consistency(self, tables, th):
        """
        Checks tables for time cross-consistency (i.e. finding prodict_id - location_id pairs
        that have been in the SALES table and not in the STOCK table for some period)

        Parameters
        ----------
        tables : list
            List of compared tables [[table1, table2], [table3, table4]]
            
        th : int
            Threshold value showing how many rows shouldnot be in table2 to add it to data_quality_output
            

        Returns
        -------
        pd.DataFrame
            Add rows to data_quality_output table
        """
        
        for df1_name, df2_name in tables:
            df1, df2 = pd.read_csv(self.data_path + df1_name + '.csv'), pd.read_csv(self.data_path + df2_name + '.csv')
            common_cols = df1.columns.intersection(df2.columns)
            common_id_cols = common_cols[common_cols.str.contains('ID')]
            common_id_cols = list(common_id_cols[(common_id_cols.str.contains('PRODUCT')) | (common_id_cols.str.contains('LOCATION'))])

            common_dt_cols = list(common_cols[common_cols.str.endswith('_DT')])

            if common_dt_cols == [] or common_id_cols == []:
                break

            common_cols = common_id_cols + common_dt_cols

            df_merged = df1.drop_duplicates(common_cols).merge(df2.drop_duplicates(common_cols), on=common_cols, 
                               how='left', indicator=True)

            result1 = df_merged[df_merged['_merge'] == 'left_only'][common_cols]

            if not result1.empty:
                result1['INPUT_TABLE'] = df1_name + ' && ' + df2_name
                result1['INPUT_VALUE'] = th
                result1['WARNING_TYPE'] = 'time_cross_consistency'
                result1['WARNING'] = f'id rows from table {df1_name} doesnot appear in table {df2_name}'
                self.data_quality_output = pd.concat([self.data_quality_output, result1])

            both = df_merged[df_merged['_merge'] == 'both']
            both = both.groupby(common_cols).size().reset_index(name='cnt')
            result2 = both[both['cnt'] <= th].drop('cnt', axis=1)

            if not result2.empty:
                result2['INPUT_TABLE'] = df1_name + ' && ' + df2_name
                result2['INPUT_VALUE'] = th
                result2['WARNING_TYPE'] = 'time_cross_consistency'
                result2['WARNING'] = f'id rows from table {df1_name} doesnot appear in table {df2_name}'
                self.data_quality_output = pd.concat([self.data_quality_output, result2])

        return self.data_quality_output
    
    
    def format_output(self, lvl_data):
        """
        Add ID columns to data_quality_output table.

        Parameters
        ----------
        lvl_data : dict
            Information about ID lvls (LOCATION, CUSTOMER, etc.)
            

        Returns
        -------
        pd.DataFrame
            Formatted data_quality_output table
        """
        for el in lvl_data.keys():
            if f'{el}_ID' not in self.data_quality_output.columns:
                continue
                
            df = pd.read_csv(self.data_path + lvl_data[el] + '.csv')
            cols = df.columns
            last_lvl = len(cols[cols.str.contains(f'{el}_LVL_ID')])

            self.data_quality_output[f'{el}_LVL_ID{last_lvl + 1}'] = self.data_quality_output[f'{el}_ID'].astype('Int64')
            self.data_quality_output[f'{el}_LVL'] = last_lvl + 1
            self.data_quality_output = self.data_quality_output.drop(f'{el}_ID', axis=1)
            
    
    def check(self):
        self.check_val_range(
            self.input_tables['val_range'],
            self.th_values['val_range']
        )
        self.check_cross_consistency(self.input_tables['cross_consistency'])
        
        self.check_time_cross_consistency(self.input_tables['time_cross_consistency'],
                                          self.th_values['time_cross_consistency']
                                         )
        
        self.format_output(self.lvl_data)


In [321]:
th_values = {
    'val_range': 0,
    'time_cross_consistency': 2,
}

input_tables = {
    'val_range': [('DPS_PRICE', 'PRICE'), ('DPS_PROMO', 'PROMO_PRICE')],
    'cross_consistency': ['DPS_SELL_OUT', 'DPS_PRICE', 'DPS_STOCK'],
    'time_cross_consistency': [['DPS_SELL_OUT', 'DPS_STOCK'], ['DPS_STOCK', 'DPS_SELL_OUT']],
}

lvl_data = {
    'LOCATION': 'DPS_LOCATION',
    'PRODUCT': 'DPS_PRODUCT',
    'CUSTOMER': 'DPS_CUSTOMER',
    'DISTR_CHANNEL': 'DPS_DISTR_CHANNEL'
}

dq = DQ(check_id=123, check_name='First check', client=666,
        input_tables=input_tables, th_values=th_values, lvl_data=lvl_data,
        data_path='../data/'
       )

In [322]:
dq.check()

In [323]:
dq.data_quality_output

Unnamed: 0,INPUT_TABLE,WARNING_TYPE,WARNING,PERIOD_DT,INPUT_VALUE,LOCATION_LVL_ID6,LOCATION_LVL,PRODUCT_LVL_ID8,PRODUCT_LVL,CUSTOMER_LVL_ID6,CUSTOMER_LVL,DISTR_CHANNEL_LVL_ID2,DISTR_CHANNEL_LVL
0,DPS_SELL_OUT && DPS_PRICE,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,,,600002,6,80001,8,6000015,6,1,2
1,DPS_SELL_OUT && DPS_PRICE,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,,,600002,6,80001,8,6000018,6,1,2
2,DPS_SELL_OUT && DPS_PRICE,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,,,600004,6,80001,8,6000019,6,1,2
3,DPS_SELL_OUT && DPS_PRICE,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,,,600009,6,80001,8,6000020,6,1,2
4,DPS_SELL_OUT && DPS_PRICE,cross_consistency,id rows from table DPS_SELL_OUT doesnot appear...,,,600010,6,80001,8,6000007,6,1,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...
269,DPS_STOCK && DPS_SELL_OUT,time_cross_consistency,id rows from table DPS_STOCK doesnot appear in...,28-Feb-21,2.0,600005,6,80083,8,,6,,2
270,DPS_STOCK && DPS_SELL_OUT,time_cross_consistency,id rows from table DPS_STOCK doesnot appear in...,28-Feb-21,2.0,600009,6,80083,8,,6,,2
271,DPS_STOCK && DPS_SELL_OUT,time_cross_consistency,id rows from table DPS_STOCK doesnot appear in...,28-Feb-21,2.0,600010,6,80083,8,,6,,2
272,DPS_STOCK && DPS_SELL_OUT,time_cross_consistency,id rows from table DPS_STOCK doesnot appear in...,28-Feb-21,2.0,600011,6,80083,8,,6,,2
