In [1]:
from utils.shuqing_all import *

from scipy.stats import ttest_rel, ks_2samp
from math import sqrt
from scipy.special import rel_entr,kl_div
import os
from scipy.stats import chi2,chi2_contingency
from statsmodels.stats.contingency_tables import cochrans_q
from scipy.stats import wilcoxon
from datetime import datetime

## Objective:
1. In order to detect if input drift has occurred, we used statistical tests to compare input data we used to train RFM model in April with current input data in August. If p-value from the test is less than 5%, we conclude that input drift has occurred.

List of statistical tests we used:

- T test
- The two sample Kolmogorov-Smirnov
- KL divergence 
- Relative entropy

2. In order to detect if output drift has occurred, we used statistical tests to compare RFM scores distribution per group in April against the scores in August. If p-value from the test is less than 5%, we conclude that output drift has occurred.

List of statistical tests we used:

- Chi-squared test
- Cochran's Q test
- Wilcoxon test




In [2]:
def download_data(project_id,auth= None, sign= None, time_constraint = None,query =None):
    """Downlaod and authenticate the data"""
    if project_id is None: raise ValueError('Argument cannot be none')
    auth = pydata_google_auth.get_user_credentials(scopes=["https://www.googleapis.com/auth/bigquery"]) 
    query = query 
    if time_constraint is not None: query +=f"WHERE started_at {sign}{time_constraint}"
    df=pd.read_gbq(query=query, project_id=project_id,credentials=auth, 
                 progress_bar_type="tqdm_notebook", use_bqstorage_api=True)
    return df
PROJECT = 'uw-data-warehouse-prod'
raw_table="""
SELECT * FROM `uw-data-models-prod.partner_dataform_models_position.rfm_raw_data`
WHERE snapshot_date >='2021-08-24'
"""
rfm="""
SELECT * FROM `uw-data-models-prod.partner_dataform_models_feature_catalogue.partner_position_rfm_score` 
WHERE  is_rfm_eligible = true
"""

In [3]:
raw_df=download_data(project_id=PROJECT,query=raw_table)

Downloading:   0%|          | 0/356648 [00:00<?, ?rows/s]

In [None]:
rfm_df=download_data(project_id=PROJECT,query=rfm)

We compute clusters based on 4 metrics:

LTV gathered in the last year (this would be the "M"onetary value in the RFM lingo)

Number of customers gathered in the last year (Frequency)

How many days have passed since the last activity -customer gathering, partner recruitment, supporting activity- (Recency)



In [4]:
class Inputdrift:
    """
    Compute the tests for input data drift
    Null hypothesis: same distribution for training data and current data
    if pvalue <threshold(level of confidence, which is 0.05 for now), reject null h0, the distribution has changed
    Source table : `uw-data-models-prod.partner_dataform_models_position.rfm_raw_data`
    """
    from math import sqrt
    from scipy.special import rel_entr,kl_div   
    from scipy.stats import ttest_rel, ks_2samp
    from datetime import datetime
    
    def __init__(self,df):
        self.df=df
        self.training_date=None
        self.current_date=None
        self.d1=None
        self.d2=None
    

    def prepare_input_drift_data(self,training_date,current_date):
        if training_date is None: training_date='2021-08-24' 
        else: 
            training_date
        if current_date is None: 
            current_date='2021-08-15' 
        else: 
            current_date
            
        feats=['title_group','recency', 'frequency', 'value']
        training_df=self.df.loc[self.df.snapshot_date==training_date,feats+['partner_position_id']].set_index('partner_position_id')
        current_df=self.df.loc[self.df.snapshot_date==current_date,feats+['partner_position_id']].set_index('partner_position_id')
        training_df['value']=training_df.value.astype('float')
        current_df['value']=current_df.value.astype('float')
        drift_df=training_df.join(current_df,how='inner',rsuffix='_current')
        drift_df=drift_df.reset_index()
        # Count how many recency outlier partners who have training recency > current recency
        outlier_list=[]
        for idx, row in drift_df.iterrows():
            if row['recency']>row['recency_current']:
                outlier_list.append(row['partner_position_id'])
        drift_df['count_recency_outliers']=np.where(drift_df.partner_position_id.isin(outlier_list),1,0)
        
        #function to calculate how many days have passed between traning_date and current_date
        def days_between(d1, d2):
            d1 = datetime.strptime(d1, "%Y-%m-%d")
            d2 = datetime.strptime(d2, "%Y-%m-%d")
            return abs((d2 - d1).days)
                      
        #for recency col, we take days between training_date and  current_date away from receny_current to account for 
        #sensitivity for ttest recency, we also create respective fre and value col for later ttest 
        
        drift_df['recency_current_cleaned']=drift_df['recency_current']-days_between(training_date,current_date)
        drift_df['frequency_current_cleaned']=drift_df['frequency_current']
        drift_df['value_current_cleaned']=drift_df['value_current']
        return drift_df
    
    def cohen_d(self,d1,d2):
        """
        Cohen's d is an appropriate effect size for the comparison between two means
        used to accompany the reporting of t test Cohen's d = (M2 - M1) ⁄ SDpooled
        """
        s=sqrt((d1.var()+d2.var())/2)
        return (d1.mean()-d2.mean())/s
    
    def compute_summary_stats_table(self,*args):
        """
        This test compares the underlying continuous distributions F(x) and G(x) of two independent samples
        pvalue <5%, reject null h0, the distribution has changed       
        """ 
        training_date,current_date,cohen_threshold,tt_test_threshold,ks_threshold =args
        
        # bring in the training and current data 
        drift_df=self.prepare_input_drift_data(training_date,current_date)
        #rfm cols for stats test
        rfm_cols=['recency', 'frequency', 'value']
        
        # preparing the df which contains test date, various stats tests' pvalue, drfit threshold
        table=pd.DataFrame({'test_date':current_date
                            
                            #cohen_d:  https://en.wikiversity.org/wiki/Cohen%27s_d
                            ,'cohen_d':[self.cohen_d(drift_df[f],drift_df[f+'_current']) for f in rfm_cols],
                           #ttest: https://en.wikipedia.org/wiki/Student%27s_t-test
                            'ttest_pvalue':[ttest_rel(drift_df[f],drift_df[f+'_current_cleaned']).pvalue for f in rfm_cols],
                           #ks_2sample: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
                            'ks_2sample_pvalue':[ks_2samp(drift_df[f],drift_df[f+'_current']).pvalue for f in rfm_cols],
                           # kl_div: https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence
                            'kl_div':[sum(kl_div(drift_df[f],drift_df[f+'_current'])) for f in rfm_cols],
                           #rel_entr: https://en.wikipedia.org/wiki/Quantum_relative_entropy
                            #https://docs.scipy.org/doc/scipy/reference/generated/scipy.special.rel_entr.html
                            'rel_entr':[sum(rel_entr(drift_df[f],drift_df[f+'_current'])) for f in rfm_cols],
                      
                            'cohen_drift_threshold':cohen_threshold,             
                            'ttest_drift_threshold': tt_test_threshold,
                            'ks_drift_threshold': ks_threshold},
                             index=rfm_cols)
        # need to use rfm col later,so have it as col now
        table=table.reset_index().rename(columns={'index':'rfm'})
    
        #add in cols of whether or not drift has occured based on each stats tests
        table['cohen_d_drift']=np.where(table.cohen_d<cohen_threshold,0,1)

        table['ttest_drift']=np.where(table.ttest_pvalue>tt_test_threshold,0,1)

        table['ks_2sample_drift']=np.where(table.ks_2sample_pvalue>ks_threshold,0,1)

        #Count how many recency outlier partners by summing up drift_df count_recency_outliers col
        table.loc[table.rfm=='recency','outlier_recency_count']=drift_df['count_recency_outliers'].sum()

        #total number of drifts based on stats tests
        table['number_of_drifts_detected']=table.cohen_d_drift+table.ttest_drift+table.ks_2sample_drift
        
        return table[['rfm','test_date','cohen_d', 'ttest_pvalue', 'ks_2sample_pvalue', 'kl_div', 'rel_entr',
       'cohen_drift_threshold','ttest_drift_threshold','ks_drift_threshold','cohen_d_drift','ttest_drift','ks_2sample_drift',
       'outlier_recency_count',
                      'number_of_drifts_detected']]

In [5]:
inputdrift_group1=Inputdrift(raw_df[raw_df.title_group=='group_1'])
inputdrift_group2=Inputdrift(raw_df[raw_df.title_group=='group_2'])
inputdrift_group3=Inputdrift(raw_df[raw_df.title_group=='group_3'])

In [6]:
inputdrift_group1.prepare_input_drift_data('2021-08-24','2021-08-25')

Unnamed: 0,partner_position_id,title_group,recency,frequency,value,title_group_current,recency_current,frequency_current,value_current,count_recency_outliers,recency_current_cleaned,frequency_current_cleaned,value_current_cleaned
0,AA6213,group_1,337,0,0.0,group_1,338,0,0.0,0,337,0,0.0
1,N75499,group_1,43,0,0.0,group_1,44,0,0.0,0,43,0,0.0
2,N70364,group_1,277,0,0.0,group_1,278,0,0.0,0,277,0,0.0
3,AB8923,group_1,70,0,0.0,group_1,71,0,0.0,0,70,0,0.0
4,N69566,group_1,308,0,0.0,group_1,309,0,0.0,0,308,0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1779,AA6315,group_1,269,4,4.3,group_1,270,4,4.3,0,269,4,4.3
1780,AB6097,group_1,112,6,60.4,group_1,113,6,60.4,0,112,6,60.4
1781,AA6900,group_1,187,6,16.2,group_1,188,6,16.2,0,187,6,16.2
1782,AA4345,group_1,270,8,10.3,group_1,271,8,10.3,0,270,8,10.3


In [9]:
# Null hypothesis: same distribution for training data and current data
# if p value is less than threshold, drift has happened, see drift_detection cols
inputdrift_group1.compute_summary_stats_table('2021-08-24','2021-08-25',
                             0.02, 0.01, 0.05 )

Unnamed: 0,rfm,test_date,cohen_d,ttest_pvalue,ks_2sample_pvalue,kl_div,rel_entr,cohen_drift_threshold,ttest_drift_threshold,ks_drift_threshold,cohen_d_drift,ttest_drift,ks_2sample_drift,outlier_recency_count,number_of_drifts_detected
0,recency,2021-08-25,-0.006674,0.062745,1.0,2004.312419,750.312419,0.02,0.01,0.05,0,0,0,10.0,0
1,frequency,2021-08-25,-0.009688,0.003866,1.0,3.473993,-6.526007,0.02,0.01,0.05,0,1,0,,1
2,value,2021-08-25,0.006783,0.010446,1.0,inf,inf,0.02,0.01,0.05,0,0,0,,0


In [10]:
inputdrift_group2.compute_summary_stats_table('2021-08-24','2021-08-25',
                             0.02, 0.01, 0.05 )

Unnamed: 0,rfm,test_date,cohen_d,ttest_pvalue,ks_2sample_pvalue,kl_div,rel_entr,cohen_drift_threshold,ttest_drift_threshold,ks_drift_threshold,cohen_d_drift,ttest_drift,ks_2sample_drift,outlier_recency_count,number_of_drifts_detected
0,recency,2021-08-25,-0.005737,1.055501e-08,0.945482,12026.2,6283.196616,0.02,0.01,0.05,0,1,0,95.0,1
1,frequency,2021-08-25,0.000875,4.266804e-06,1.0,18.54904,88.54904,0.02,0.01,0.05,0,1,0,,1
2,value,2021-08-25,0.002213,3.177143e-25,1.0,inf,inf,0.02,0.01,0.05,0,1,0,,1


In [11]:
inputdrift_group3.compute_summary_stats_table('2021-08-24','2021-08-26',
                             0.02, 0.01, 0.05  )

Unnamed: 0,rfm,test_date,cohen_d,ttest_pvalue,ks_2sample_pvalue,kl_div,rel_entr,cohen_drift_threshold,ttest_drift_threshold,ks_drift_threshold,cohen_d_drift,ttest_drift,ks_2sample_drift,outlier_recency_count,number_of_drifts_detected
0,recency,2021-08-26,-0.001259,0.000412,0.999824,321.954742,-507.045258,0.02,0.01,0.05,0,1,0,21.0,1
1,frequency,2021-08-26,0.00047,0.655156,1.0,inf,inf,0.02,0.01,0.05,0,0,0,,0
2,value,2021-08-26,0.003769,1.8e-05,1.0,inf,inf,0.02,0.01,0.05,0,1,0,,1


In [8]:
class OutRFMdrift:
    
    from scipy.stats import chi2,chi2_contingency
    from statsmodels.stats.contingency_tables import cochrans_q
    from scipy.stats import wilcoxon
    """
    compute the statistical tests for catogorical rfm scoring output
    Null hypothesis: same distribution for RFM Score at baseline and current RFM score 
    
    if pvalue < threshold(level of confidence, which is 0.05 for now), reject null h0, the distribution has changed
    source table: `uw-data-models-prod.partner_dataform_models_position.monthly_rfm_score`
                  `uw-data-models-prod.partner_dataform_models_position.position` 
    Map partners'titles with title groups:
    group_1 = ['D']
    group_2 = ['QD', 'TL', 'STL']
    group_3 = ['GL', 'SGL', 'NNL', 'NGL']

    group_map = {'D': 'group_1'}
    for t in group_2: group_map[t] = 'group_2'
    for t in group_3: group_map[t] = 'group_3'
    Then merge monthly_rfm_score with position to get title group for each partners 
    """
    def __init__(self,df):
        self.df=df
        self.training_date=None
        self.current_date=None
    
    def prepare_comparison_data(self,training_date,current_date):
        """
        use monthly rfm table to prepare for comparison dfs 
        prepare the table for statistical tests for 3 groups
        """
        if training_date is None: training_date='2021-08-24' 
        else: 
            training_date
        if current_date is None: 
            current_date='2021-09-04' 
        else: 
            current_date
            
        training_df=self.df.loc[self.df.snapshot_date==training_date]
        current_df=self.df.loc[self.df.snapshot_date==current_date] 
        #prepare training and current data which contains col of rfm_score and col of volumn of people fall into each score
        training=training_df.groupby(['title_group'])['rfm_score'].value_counts().unstack().T
        current=current_df.groupby(['title_group'])['rfm_score'].value_counts().unstack().T
        
        # temporary solution to remove rfm score in current data which doesn't appear in training data 
        #get the list of rfm scores in training 
        s=set(training.index.tolist()) 
        # find the missing score in current compared to training
        missing_score=[x for x in current.index.tolist() if x not in s]
        # only remove missing score in current if len of missing score >0  
        if len(missing_score)>0:          
            current.drop(index=missing_score[0],inplace=True)
        else:
            current
        #prepare for table for each group which contains rfm distribution in training data and current data respectively 
        table_1=[training['group_1'].values,current['group_1'].values]
        table_2=[training['group_2'].values,current['group_2'].values]
        table_3=[training['group_3'].values,current['group_3'].values]
        
        return table_1,table_2,table_3
    
    def compute_summary_stats_table(self,*args):
        """
        make summary table for statistical tests' pvalues and test conclusions 
        """
        training_date,current_date,chi2_threshold,cochrans_threshold,wilcoxon_threshold=args
        # get the distribution table for 3 groups 
        table_1,table_2,table_3=self.prepare_comparison_data(training_date,current_date)
        
        #prepare for the summary table which contains test data and pvalue for each stats test, drift threshold
        summary_p=pd.DataFrame({'test_date':current_date,
                                #chi2: https://en.wikipedia.org/wiki/Chi-squared_test
                                'chi2_contingency_pvalue':[chi2_contingency(t)[1] for t in [table_1,table_2,table_3]],
                                #https://en.wikipedia.org/wiki/Cochran%27s_Q_test
                                'cochrans_q_pvalue':[cochrans_q(t).pvalue for t in [table_1,table_2,table_3]],
                                #wilcoxon: https://en.wikipedia.org/wiki/Wilcoxon_signed-rank_test
                                #https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.wilcoxon.html
                                'wilcoxon_pvalue':[g[1] for g in [wilcoxon(t[0],t[1]) for t in [table_1,table_2,table_3]]],
        
                                'chi2_drift_threshold':chi2_threshold ,        
                                'cochrans_drift_threshold': cochrans_threshold,       
                                'wilcoxon_drift_threshold':wilcoxon_threshold},
                                index=['group_1','group_2','group_3'])
       
        #this section is to remove null p value and use valid pvalue to determine if drift has occured per test
        test=[]
        # loop over chi2_pvalue on each row
        for idx, row in summary_p[['chi2_contingency_pvalue']].iterrows():
            #this is criteria to determine whether drift has occrued by removing null and pvalue< threshold
            a=(row.notnull()&(row<chi2_threshold))
            # if drift occured, append this pvalue to test
            test.append(a['chi2_contingency_pvalue'])
        # create new result 
        t1=pd.DataFrame(test,index=['group_1','group_2','group_3'],columns=['chi2_contingency_drift'])
        # append to summary table
        summary_p=pd.concat([summary_p,t1],axis=1)
        
        # repeat the process for cochrans_q test
        test=[] 
        for idx, row in summary_p[['cochrans_q_pvalue']].iterrows():
            a=(row.notnull()&(row<cochrans_threshold))
            test.append(a['cochrans_q_pvalue'])
        t2=pd.DataFrame(test,index=['group_1','group_2','group_3'],columns=['cochrans_q_drift'])
        summary_p=pd.concat([summary_p,t2],axis=1)
        
        #count number of drift per test for final total_number_drift col
        summary_p['count_chi2_contingency']=np.where(summary_p.chi2_contingency_drift,1,0)
        summary_p['count_cochrans_q']=np.where(summary_p.cochrans_q_drift,1,0)       
        summary_p['wilcoxon_drift']=np.where(summary_p.wilcoxon_pvalue>wilcoxon_threshold,0,1)
        
        #total number of drift by adding up each drift col's result
        summary_p['number_of_drifts_detected']=summary_p.count_chi2_contingency+summary_p.count_cochrans_q+summary_p.wilcoxon_drift
        
        return summary_p[['test_date','chi2_contingency_pvalue', 'cochrans_q_pvalue', 'wilcoxon_pvalue',
       'chi2_drift_threshold','cochrans_drift_threshold','wilcoxon_drift_threshold','count_chi2_contingency','count_cochrans_q','wilcoxon_drift', 'number_of_drifts_detected']]        

In [48]:
# for d in rfm_df.snapshot_date.unique():
#     print(d,rfm_df[rfm_df.snapshot_date==d].rfm_score.unique())

In [9]:
rfmscore=OutRFMdrift(rfm_df)

## Note: 
- in the test result, we saw NaNs, is because there is NaN in RFM categories eg. nobody falls into RFM 0. We think imputation would be inappropriate as it will pollute the distribution and we can't impute 0 for chi2_contingency and cochrans_q tests
- However wilcoxon still works regardless of NaNs in the data


In [10]:
# Null hypothesis: same distribution for training data and current data
# if p value is less than 5%, drift has happened, see drift_detection col
rfmscore.compute_summary_stats_table('2021-08-24','2021-08-25',0.05,0.05,0.05)

  q_stat = ((k-1) * (k * np.sum(count_col_success**2) - count_col_ss**2)


Unnamed: 0,test_date,chi2_contingency_pvalue,cochrans_q_pvalue,wilcoxon_pvalue,chi2_drift_threshold,cochrans_drift_threshold,wilcoxon_drift_threshold,count_chi2_contingency,count_cochrans_q,wilcoxon_drift,number_of_drifts_detected
group_1,2021-08-25,,,0.283876,0.05,0.05,0.05,0,0,0,0
group_2,2021-08-25,1.0,0.44568,0.720405,0.05,0.05,0.05,0,0,0,0
group_3,2021-08-25,1.0,0.44568,1.0,0.05,0.05,0.05,0,0,0,0
