In [56]:
import logging
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import numpy as np

class AB_test():
    """
        ???
    """

    def __init__(self, df: pd.DataFrame, first_level: str, high_level = None):
        """
        Parameters
        ----------
        ???
        """
        self.__version__ = 0.0
        if not isinstance(df, pd.DataFrame):
            raise Exception('Need to provide Pandas DataFrame')
        if not isinstance(first_level, str):
            raise Exception('First_level must be string') 
        if first_level not in df.columns:
            raise Exception('First level column not exists') 
        if df[first_level].isnull().sum() > 0:
            raise Exception('First level contains NaN') 
        
        logger_format = '%(name)s: %(levelname)-8s %(message)s'
        logging.basicConfig(format=logger_format)
        self.logger = logging.getLogger('AB Tester')
        self.logger.setLevel(logging.INFO)
        self._first_level_column = first_level
        self._high_level_column = high_level
        if high_level is not None:
            if not isinstance(high_level, str):
                raise Exception('high_level must be string')
            self._high_level_column = high_level
            self._high_level_a_group = set()
            self._high_level_b_group = set()
            df = self._prepare_high_level(df)
        self._a_b_candidates = [] 
        self.df = df
    
    def _prepare_high_level(self, df):
        if self._high_level_column not in df.columns:
            raise Exception('High level column not exists')
        mask=(df[self._high_level_column].isnull())
        if mask.sum() > 0:
            df[self._high_level_column+'_fixed'] = df[self._high_level_column].copy()
            df.loc[mask,self._high_level_column+'_fixed']=df.loc[mask, self._first_level_column] 
            self._high_level_column = self._high_level_column+'_fixed'
            self.logger.info('Nan in second level were replaced')
        return df
    
    @staticmethod
    def _minmax_normalizer(x):
        mn, mx = x.min(), x.max()
        return (x - mn) / (mx - mn + 0.0000001)
    
    @staticmethod
    def _distance_score(first_features: np.array, second_features: np.array):
        if not isinstance(first_features, np.ndarray) and not isinstance(first_features, np.ndarray):
            raise Exception('Input values must be np.array type')
        if first_features.shape[0]!= second_features.shape[0]:
            raise Exception('Input values must be the same lenght')
        else:
            return np.sqrt(np.sum((first_features-second_features)**2))

    
    def classic_split(self, features: list):
        
        def get_pair(df: pd.DataFrame):
            if df.shape[0]<2:
                return df, True
            first_row = df.iloc[[0]]
            temp_df = df.iloc[1:].copy()
            first_candidate = first_row[self._first_level_column].values[0]
            if self._high_level_column is not None:
                first_candidate_parent = first_row[self._high_level_column].values[0]
            first_candidate_features = np.array(first_row['features_list'].values.tolist())
            
            if self._high_level_column is not None:
                
                if first_candidate_parent not in self._high_level_b_group:
                    self._high_level_a_group.add(first_candidate_parent)
                    temp_df['map_a_group']=temp_df[self._high_level_column].isin(self._high_level_a_group)
                    temp_df=temp_df[~temp_df['map_a_group']]
                    if temp_df.shape[0]==0:
                        return df, True
                    temp_df['distance_score'] = temp_df['features_list'].apply(lambda x: self._distance_score(first_candidate_features, x))                         
                    temp_df=temp_df.sort_values('distance_score').iloc[[0]]
                    competitor = temp_df[self._first_level_column].values[0]
                    curent_score = temp_df['distance_score'].values[0]
                    competitor_parent=temp_df[self._high_level_column].values[0]
                    self._high_level_b_group.add(competitor_parent)
                    self._a_b_candidates.append([first_candidate, first_candidate_parent, competitor, competitor_parent, curent_score])
                    df = df[df[self._first_level_column]!=first_candidate]
                    df = df[df[self._first_level_column]!=competitor]
                    return df, False
                
                if first_candidate_parent in self._high_level_b_group:
                    self._high_level_b_group.add(first_candidate_parent)
                    temp_df['map_a_group']=temp_df[self._high_level_column].isin(self._high_level_a_group)
                    temp_df=temp_df[temp_df['map_a_group']]
                    if temp_df.shape[0]==0:
                        return df, True
                    temp_df['distance_score'] = temp_df['features_list'].apply(lambda x: self._distance_score(first_candidate_features, x))                         
                    temp_df=temp_df.sort_values('distance_score').iloc[[0]]
                    competitor = temp_df[self._first_level_column].values[0]
                    curent_score = temp_df['distance_score'].values[0]
                    competitor_parent=temp_df[self._high_level_column].values[0]
                    self._high_level_a_group.add(competitor_parent)
                    self._a_b_candidates.append([competitor, competitor_parent, first_candidate, first_candidate_parent, curent_score])
                    df = df[df[self._first_level_column]!=first_candidate]
                    df = df[df[self._first_level_column]!=competitor]  
                    return df, False
                
            else:   
                temp_df['distance_score'] = temp_df['features_list'].apply(lambda x: self._distance_score(first_candidate_features, x))                         
                temp_df=temp_df.sort_values('distance_score').iloc[[0]]
                competitor = temp_df[self._first_level_column].values[0]
                curent_score = temp_df['distance_score'].values[0]
                competitor_parent=temp_df[self._high_level_column].values[0]
                self._a_b_candidates.append([first_candidate, first_candidate_parent, competitor, competitor_parent, curent_score])
                df = df[df[self._first_level_column]!=first_candidate]
                df = df[df[self._first_level_column]!=competitor]   
        
        if not isinstance(features, list):
            raise Exception('Feaures must be list type')
        if len(set(features) - set(self.df.columns))!=0:
            raise Exception('Feaures contain ')
        df=self.df.copy()
        if df.shape[0]<2:
            raise Exception('Input must have at least 2 rows')
        
        for feature in features:
            df[feature+'_scaled'] = self._minmax_normalizer(df[feature])
        features = list(map(lambda x: x + '_scaled', features))        
        df['features_list'] = df[features].values.tolist()
        df['features_list'] = df['features_list'].apply(np.array)
        
        flag=False
        while not flag:
            df, flag = get_pair(df)
        del df, flag 
        
        if self._high_level_column is not None:
            df = pd.DataFrame(self._a_b_candidates, columns=['A group','A_parent','B group', 'B parent', 'Distance score'])
        else:
            df = pd.DataFrame(self._a_b_candidates, columns=['A group', 'B group', 'Distance score'])


     
            
            
            
            

a=AB_test(df,'site_product_id','parent_product_id')

AB Tester: INFO     Nan in second level were replaced


In [57]:
a.classic_split(['avg_listing_price'])

KeyError: "None of [Index(['avg_listing_price_scaled'], dtype='object')] are in the [columns]"

In [50]:
len(a._high_level_b_group)

984

In [47]:
len(set(a._high_level_b_group))

984

In [51]:
pd.DataFrame(a._a_b_candidates)

Unnamed: 0,0,1,2,3,4
0,B09N2RY9B6,B09N2RY9B6,B09Q69X8W7,B09Q69X8W7,0.000000
1,B09Q6F9YHQ,B09Q6F9YHQ,B07NTP66XT,B07NTP66XT,0.000000
2,B091GQ3JQ4,B091GQ3JQ4,B09CDTP62P,B09CDTP62P,0.000000
3,B08F9LP18T,B08F9LP18T,B07H5ZXZHJ,B07H5ZXZHJ,0.000000
4,B07MH322KY,B07MH322KY,B09PNQK5YF,B09PNQK5YF,0.000000
...,...,...,...,...,...
989,B097R8H3GY,B097R8H3GY,B083BWZSG5,B083BWZSG5,0.028136
990,B078HPRDWW,B078HPRDWW,B07MNXR7NR,B07MNXR7NR,0.000000
991,B085XJT2FC,B085XJT2FC,B07MPPTTRD,B07MPPTTRD,0.000000
992,B01MYQD265,B01MYQD265,B079DD2GMR,B079DD2GMR,0.000000


In [4]:
import awswrangler as wr

query = '''with asins as (
select 
distinct
site_product_id 
from data_lake_public.fact_listings_daily 
where date(date)= date('2022-02-15')
and store_id = 6010
and is_active = true
--and is_repriced = false
)

, parent as (
select distinct 
site_product_id ,
parent_product_id
from data_lake_public.amazon_products_attributes
where parent_product_id in (select site_product_id from asins)
) 

,test_asins as(
select
distinct
asn.site_product_id,
parent_product_id
from asins asn left join parent prn 
on asn.site_product_id = prn.site_product_id
)


, avg_price as (
select
site_product_id ,
avg(avg_listing_price) as avg_price
from data_lake_public.fact_listings_daily
where date(date) >= date('2022-02-07') and date(date) <= date('2022-02-14')
and site_product_id in (select distinct site_product_id from test_asins)
and store_id = 6010
group by 1
)

, sales as (
select 
site_product_id ,
sum(total_sales) as total_sales ,
sum(ordered_quantity) as orders
from data_lake_public.fact_listing_order_items floi
where date(purchase_date) >= date('2022-02-07') and date(purchase_date) <= date('2022-02-14')
and status not in ('CANCELED')
and store_id = 6010
and site_product_id in (select distinct site_product_id from test_asins)
group by 1
)

, sessions as (
select 
child_asin ,
sum(sessions) as sessions 
from intelligence.business_report 
where store_id = 6010
and date(date) >= date('2022-02-07') and date(date) <= date('2022-02-14')
group by 1
)

, res as (
select 
pas.site_product_id ,
pas.parent_product_id,
case when sls.total_sales is null then 0 else total_sales end total_sales,
case when sls.orders is null then 0 else orders end orders ,
case when avg_price is null then 0 else avg_price end avg_listing_price,
case when sessions is null then 0 else sessions end sessions
from test_asins pas 
join avg_price avgp 
on pas.site_product_id = avgp.site_product_id
left join sales sls 
on pas.site_product_id  = sls.site_product_id
left join sessions sss
on pas.site_product_id = sss.child_asin
)


select * from res'''
df = wr.athena.read_sql_query(query, database='intelligence', ctas_approach=False)

In [36]:
df[df['site_product_id']=='B09N2RY9B6']

Unnamed: 0,site_product_id,parent_product_id,total_sales,orders,avg_listing_price,sessions,parent_product_id_fixed
0,B09N2RY9B6,,931.0,104,9.47,0,B09N2RY9B6
