In [242]:
# @title Библиотеки

# BigQuery
from google.colab import auth
from google.cloud import bigquery
from google.colab import data_table

# Analytics
import pandas as pd
import seaborn as sns
import random
import matplotlib.pyplot as plt

# Tests
import statsmodels.api as sm
import numpy as np
from scipy import stats
from scipy.stats import ttest_ind
from statsmodels.stats.proportion import proportions_ztest
from scipy.stats import ttest_ind_from_stats
from scipy.stats import mannwhitneyu
from sklearn.ensemble import IsolationForest

# Other
import tqdm

In [250]:
def prepare_table_for_ab_test_1(data, channel, start_date, end_date):
    # Create a copy of the data to avoid SettingWithCopyWarning
    data = data.copy()

    # Your existing logic for handling different channels
    if channel in ['kassa', 'cc']:
        data = data.drop(['Franchisee Name', 'City', 'Name', 'Facility',
                          'AGC (Net)', 'Rating', 'Dish Quantity', 'Period',
                          'DoW', 'RestPick', 'Rest excl'], axis=1)
        data = data.rename(columns={'Code': 'restraunt_id',
                                    'Date': 'event_date', 'Net Sales': 'revenue',
                                    'Checs Qnt': 'order_success_count'})
    elif channel == 'kiosk':
        data = data.dropna()

    # Common processing
    data['channel'] = channel
    data['event_date'] = pd.to_datetime(data['event_date'], errors='coerce').dt.strftime('%Y-%m-%d')
    data['day_of_week'] = pd.to_datetime(data['event_date'], errors='coerce').dt.day_name()

    try:
        data['restraunt_id'] = data['restraunt_id'].astype(int)
        data['order_success_count'] = data['order_success_count'].astype(int)
    except ValueError as e:
        print(f"Error in type conversion: {e}")

    # Filter data based on date range
    data = data[(data['event_date'] > start_date) & (data['event_date'] < end_date)]

    return data


def clear_dataframe(df):
    df.drop(df.index, inplace=True)

# BigQuery Data Loading
class BigQueryUploadingData:
    def __init__(self, project, location, query_id):
        self.project = project
        self.location = location
        self.query_id = query_id
        self.client = bigquery.Client(project=project, location=location)

    def get_job(self):
        auth.authenticate_user()
        job = self.client.get_job(self.query_id)
        return job.to_dataframe()

    def upload(self):
        return self.get_job()

# File Data Loading
class FileUploadingData:
    def __init__(self, path):
        self.path = path

    def upload(self):
        return pd.read_excel(self.path)

# Data Preparation
class PrepareData:
    @staticmethod
    def get(data, channel, ab_test_name, start_date, end_date):
        # Specific logic for ab_test_1 and ab_test_2
        if ab_test_name in ['ab_test_1', 'ab_test_2']:
            return prepare_table_for_ab_test_1(data, channel, start_date, end_date)
        return data

# Data Concatenation
class Concatenation:
    @staticmethod
    def concat_data(a, b):
        return pd.concat([a, b], ignore_index=True)

# Data Splitting
class SplitData:
    @staticmethod
    def split_before_after(data, start_date):
        data['status'] = data['event_date'].apply(lambda x: 'After' if x > start_date else 'Before' if x < start_date else 'Start')
        return data

    @staticmethod
    def get(data, test_group, control_group, start_date):
        data = data.copy()
        test_data = data[data['restraunt_id'].isin(test_group)].copy()
        control_data = data[data['restraunt_id'].isin(control_group)].copy()

        test_data['group'] = 'test'
        control_data['group'] = 'control'

        return SplitData.split_before_after(data, start_date), SplitData.split_before_after(test_data, start_date), SplitData.split_before_after(control_data, start_date)


# Data Aggregation
class Aggregation:
    @staticmethod
    def drop_outliers(data, aggregator, lower_bound, upper_bound):
        lb = data[aggregator].quantile(lower_bound)
        ub = data[aggregator].quantile(upper_bound)
        return data[(data[aggregator] >= lb) & (data[aggregator] <= ub)]

    @staticmethod
    def aggregate(data, slice_type, aggregator, aggregation_type, parameter):
        filtered_data = data.query(parameter) if parameter != 'none' else data
        if slice_type and aggregator:
            return round(filtered_data.groupby(slice_type.split(', '), as_index=False).aggregate({aggregator: aggregation_type}), 2)
        return filtered_data

# Statistical Analysis
class Stats:

    @staticmethod
    def get_stats(data):
        return data.mean(), data.std(ddof=1), data.count()

    @staticmethod
    def mannwhitneyu_test(a, b, aggregator):
        _, p_value = mannwhitneyu(a[aggregator], b[aggregator])
        return p_value

    @staticmethod
    def two_sample_ttest(a, b, aggregator):
        # Check if 'a' and 'b' are DataFrames and 'aggregator' is a valid column
        if not isinstance(a, pd.DataFrame) or not isinstance(b, pd.DataFrame) or \
           aggregator not in a.columns or aggregator not in b.columns:
            raise ValueError("Input data must be DataFrames and the aggregator must be a valid column name.")
        abar, astd, na = Stats.get_stats(a[aggregator])
        bbar, bstd, nb = Stats.get_stats(b[aggregator])
        _, p_value = ttest_ind_from_stats(abar, astd, na,
                                    bbar, bstd, nb,
                                    equal_var=True)
        return p_value

    @staticmethod
    def hist_check(aggregated_data, x_axis, y_axis):
        plt.figure(figsize=(5, 3))
        sns.barplot(x=aggregated_data[x_axis], y=aggregated_data[y_axis])
        plt.title('Гистограмма на основе агрегированных данных')
        plt.xlabel(x_axis)
        plt.ylabel(y_axis)
        plt.show()

    @staticmethod
    def test_control_hist(data, AA_test_pass_list, test_group, slice_type, aggregator, aggregation_type, parameter):
        data_1 = Aggregation.aggregate(data[data['restraunt_id'].isin(AA_test_pass_list[4])].sort_values(['revenue', 'event_date']).query('channel == "kiosk"'), slice_type, aggregator, aggregation_type, parameter)
        data_2 = Aggregation.aggregate(data[data['restraunt_id'].isin(test_group)].sort_values(['revenue', 'event_date']).query('channel == "kiosk"'), slice_type, aggregator, aggregation_type, parameter)
        Stats.hist_check(data_1, slice_type, aggregator)
        Stats.hist_check(data_2, slice_type, aggregator)
        return data_1, data_2


class ABTest:
    def __init__(self, config):
        self.config = config

    def load_data(self):
        # Loading data from various sources
        self.kiosk_data = BigQueryUploadingData(self.config['project'], self.config['location'], self.config['query_id']).upload()
        self.kassa_data = FileUploadingData(self.config['kassa_path']).upload()
        self.cc_data = FileUploadingData(self.config['cc_path']).upload()

    def preprocess_all_data(self):
        if not self.kassa_data.empty and not self.cc_data.empty:
            self.kiosk_data = self.preprocess_data('kiosk')
            self.kassa_data = self.preprocess_data('kassa')
            self.cc_data = self.preprocess_data('cc')
        self.merged_data = self.merge_data()

    def preprocess_data(self, data_type):
        # Preprocessing data based on type
        data = getattr(self, f"{data_type}_data")
        return PrepareData.get(data, data_type, self.config['name'], self.config['data_collect_start_date'], self.config['data_collect_end_date'])

    def merge_data(self):
        # Merging different data sources
        return Concatenation.concat_data(self.kiosk_data, Concatenation.concat_data(self.cc_data, self.kassa_data))

    def get_split(self, data, test_group, control_group):
        # Splitting data into test and control groups
        return SplitData.get(data, test_group, control_group, self.config['start_date'])

    def aggregate_data(self, data, period_filter=None):
        """ Aggregates data with optional period filtering """
        if period_filter:
            data = data.query(f'status == "{period_filter}"')
        config = self.config
        filtered_data = Aggregation.drop_outliers(data, config['aggregator'], config['lower_bound'], config['upper_bound'])
        aggregated_data = Aggregation.aggregate(filtered_data, config['slice_type'], config['aggregator'], config['aggregation_type'], config['parameter'])
        return aggregated_data

    def get_aggregated_data_before(self, data, test_data, control_data):
        """ Aggregate data for the period before the AB test """
        test_data_before = self.aggregate_data(test_data, 'Before')
        control_data_before = self.aggregate_data(control_data, 'Before')

        return test_data_before, control_data_before

    def get_aggregated_data_after(self, data, test_data, control_data):
        """ Aggregate data for the period after the AB test """
        test_data_after = self.aggregate_data(test_data, 'After')
        control_data_after = self.aggregate_data(control_data, 'After')

        return test_data_after, control_data_after

    def generate_control_group(self, test_group, data):
        # Generating control group
        return np.array([random.choice(data['restraunt_id']) for _ in test_group])

    def get_stats(data, aggregator):
        # Calculating statistics
        return data[aggregator].mean(), data[aggregator].std(ddof=1), data[aggregator].count()

    def execute(self, test_group):
        self.load_data()
        self.preprocess_all_data()
        AA_test_pass_list = []
        AA_results = []
        AB_results = []

        ##################### BOOTSTRAP ####################
        for _ in tqdm.tqdm(range(300), desc="Выполнение", position=0, leave=True):
            control_group = self.generate_control_group(test_group, self.merged_data)
            _, test_data, control_data = self.get_split(self.merged_data, test_group, control_group)
            agg_test_data_before, agg_control_data_before = self.get_aggregated_data_before(self.merged_data, test_data, control_data)
            t_test_result_before = Stats.two_sample_ttest(agg_test_data_before, agg_control_data_before, self.config['aggregator'])
            if 0.85 < t_test_result_before < 2:
                AA_test_pass_list.append(control_group)
                AA_results.append(self.compile_results(agg_test_data_before, agg_control_data_before, t_test_result_before))

        for control_group in AA_test_pass_list:
            _, test_data, control_data = self.get_split(self.merged_data, test_group, control_group)
            agg_test_data_after, agg_control_data_after = self.get_aggregated_data_after(self.merged_data, test_data, control_data)
            t_test_result_after = Stats.two_sample_ttest(agg_test_data_after, agg_control_data_after, self.config['aggregator'])
            AB_results.append(self.compile_results(agg_test_data_after, agg_control_data_after, t_test_result_after))

        return {
            'merged_data': self.merged_data,
            'AA_test_pass_list': AA_test_pass_list,
            'AA_average_p_value': np.mean([result['p_value'] for result in AA_results]),
            'AB_average_p_value': np.mean([result['p_value'] for result in AB_results]),
            'AA_results': AA_results,
            'AB_results': AB_results
        }

    def compile_results(self, test_data, control_data, t_test_result):
        return {
            'mean_test_before': test_data[self.config['aggregator']].mean(),
            'mean_control_before': control_data[self.config['aggregator']].mean(),
            'std_test_before': test_data[self.config['aggregator']].std(),
            'std_control_before': control_data[self.config['aggregator']].std(),
            'p_value': t_test_result
        }

A/B тест 1

Сравнение ключевых метрик - revenue, количества заказов, AOV по каналам "Kiosk", "Kassa", "C&C".

In [251]:
configurations = [
    {
        'slice_type': 'event_date',
        'aggregator': 'revenue',
        'aggregation_type': 'mean',
        'parameter': 'none'
    },
    {
        'slice_type': 'event_date',
        'aggregator': 'order_success_count',
        'aggregation_type': 'sum',
        'parameter': 'channel == "kiosk"'
    },
    {
        'slice_type': 'event_date',
        'aggregator': 'order_success_count',
        'aggregation_type': 'sum',
        'parameter': 'channel == "kassa"'
    },
    {
        'slice_type': 'event_date',
        'aggregator': 'order_success_count',
        'aggregation_type': 'sum',
        'parameter': 'channel == "cc"'
    },
]

# Common configuration
common_config = {
    'project': 'kfc-kiosk-3',
    'location': 'US',
    'query_id': 'bquxjob_546c80a9_18ca5bab174',
    'cc_path': '/content/cc.xlsx',
    'kassa_path': '/content/kassa.xlsx',
    'name': 'ab_test_1',
    'start_date': '2023-12-15',
    'data_collect_start_date': '2023-12-01',
    'data_collect_end_date': '2023-12-20',
    'lower_bound': 0,
    'upper_bound': 1
}

# Test group IDs
test_group = [74020587, 74013270, 74020449, 74021433, 74021788, 74020871, 74020660, 74021978, 74020896,
              74012184, 74020851, 74020828, 74021003, 74021914, 74021880, 74021975, 74321670, 74321666,
              74215106, 74021329, 74021678, 74021302, 74021639]

# DataFrame to store results
results_df = pd.DataFrame()

for config in tqdm.tqdm(configurations):
    # Ваш код здесь

    # Merge the specific configuration with the common configuration
    ab_test_config = {**common_config, **config}

    # Run the AB test
    ab_test = ABTest(ab_test_config)
    results = ab_test.execute(test_group)

    # Assuming results is the output from ab_test.execute(test_group)
    AA_mean_test_before = np.mean([res['mean_test_before'] for res in results['AA_results']])
    AA_mean_control_before = np.mean([res['mean_control_before'] for res in results['AA_results']])
    AB_mean_test_after = np.mean([res['mean_test_before'] for res in results['AB_results']])  # Adjust as needed
    AB_mean_control_after = np.mean([res['mean_control_before'] for res in results['AB_results']])  # Adjust as needed

    new_row = {
        'Метрика': ab_test_config['aggregator'],
        'Тип': ab_test_config['aggregation_type'],
        'Канал': ab_test_config['parameter'],
        'Метрика ДО test-группы': round(AA_mean_test_before, 0),
        'Метрика ДО control-группы': round(AA_mean_control_before, 0),
        'Метрика ПОСЛЕ test-группы': round(AB_mean_test_after, 2),
        'Метрика ПОСЛЕ control-группы': round(AB_mean_control_after, 2),
        'Прирост test-группы, %': round((AB_mean_test_after / AA_mean_test_before - 1) * 100, 3),
        'Прирост control-группы, %': round((AB_mean_control_after / AA_mean_control_before - 1) * 100, 3),
        'AB p_value': np.mean([res['p_value'] for res in results['AB_results']]),
        'AA p_value': np.mean([res['p_value'] for res in results['AA_results']])
    }

    # Check if results DataFrame exists, initialize if not
    if 'results_ab_test_1' not in globals():
        results_ab_test_1 = pd.DataFrame(columns=new_row.keys())

    # Append the new row to the DataFrame
    results_ab_test_1 = results_ab_test_1.append(new_row, ignore_index=True)

results_ab_test_1.to_excel('/content/ab_test_1_results.xlsx', index=False)


Выполнение: 100%|██████████| 300/300 [00:09<00:00, 30.18it/s]
  results_ab_test_1 = results_ab_test_1.append(new_row, ignore_index=True)
Выполнение: 100%|██████████| 300/300 [00:11<00:00, 26.77it/s]
  results_ab_test_1 = results_ab_test_1.append(new_row, ignore_index=True)
Выполнение: 100%|██████████| 300/300 [00:11<00:00, 26.15it/s]
  results_ab_test_1 = results_ab_test_1.append(new_row, ignore_index=True)
Выполнение: 100%|██████████| 300/300 [00:14<00:00, 20.70it/s]
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  results_ab_test_1 = results_ab_test_1.append(new_row, ignore_index=True)
100%|██████████| 4/4 [01:26<00:00, 21.56s/it]


In [233]:
clear_dataframe(results_ab_test_1)

In [237]:
results_ab_test_1.to_excel('/content/ab_test_1_results.xlsx', index=False)

In [252]:
results_ab_test_1

Unnamed: 0,Метрика,Тип,Канал,Метрика ДО test-группы,Метрика ДО control-группы,Метрика ПОСЛЕ test-группы,Метрика ПОСЛЕ control-группы,"Прирост test-группы, %","Прирост control-группы, %",AB p_value,AA p_value
0,revenue,mean,none,67211.0,65364.0,73540.62,72930.39,9.417,11.575,0.627211,0.321019
1,order_success_count,sum,"channel == ""kiosk""",4424.0,4675.0,3956.0,5141.44,-10.583,9.982,0.084382,0.300177
2,order_success_count,sum,"channel == ""kassa""",3988.0,3866.0,4739.5,4118.01,18.849,6.517,0.226371,0.322525
3,order_success_count,sum,"channel == ""cc""",2316.0,2079.0,2412.5,2089.0,4.146,0.481,0.078196,0.025586
4,revenue,mean,none,67211.0,67173.0,73540.62,74520.77,9.417,10.939,0.865197,0.935954
5,order_success_count,sum,"channel == ""kiosk""",4424.0,4439.0,3956.0,4865.19,-10.583,9.603,0.166899,0.90228
6,order_success_count,sum,"channel == ""kassa""",3988.0,3991.0,4739.5,4223.7,18.849,5.831,0.21174,0.944105
7,order_success_count,sum,"channel == ""cc""",,,,,,,,
8,revenue,mean,none,67211.0,67246.0,73540.62,75243.99,9.417,11.894,0.847275,0.924769
9,order_success_count,sum,"channel == ""kiosk""",4424.0,4428.0,3956.0,4895.98,-10.583,10.565,0.106092,0.928039


A/B тест 2

Сравнение метрик - Conversion Rate, Avg Products in order по каналу "Kiosk"