In [1]:
from typing import List
import copy
import itertools
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import pandas_udf
from stratification.params import SplitBuilderParams
from stratification.split_builder import build_split, prepare_cat_data, assign_strata
from prepilot.experiment_structures import BaseSplitElement


class PrepilotSplitBuilder():
    """Columns with splits and injects will be added
    """
    def __init__(self,
                 spark,
                 guests,
                 group_sizes: List[int],
                 stratification_params: SplitBuilderParams,
                 iterations_number: int = 10):
        """There is class for calculation columns with injetcs and target/control splits

        Args:
            guests: dataframe with data for calculations injects and splits
            metrics_names: list of metrics for which will be calculate injects columns
            group_sizes: list of group sizes for split building
            stratification_params: stratification parameters
            iterations_number: number of columns that will be build for each group size

        """
        self.guests = guests.withColumn("partion", F.lit(1))
        self.spark = spark
        self.iterations_number = iterations_number
        self.group_sizes = group_sizes
        self.stratification_params = copy.deepcopy(stratification_params)
        self.split_grid = self._build_splits_grid()

    def _build_splits_grid(self):
        return list(BaseSplitElement(el[0], el[1])
                    for el in itertools.product(self.group_sizes, np.arange(1, self.iterations_number+1)))
    

    def apply_strata(self):

        def strata_generator(df, stratification_params):
            schema = (copy.deepcopy(df.schema)
                    .add(T.StructField("strata", T.StringType(), False))
            )

            @pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
            def build_strata_pd(df: pd.DataFrame):

                guests_data = prepare_cat_data(df, stratification_params)
                guests_data_with_strata = assign_strata(guests_data.reset_index(drop=True), stratification_params)

                return guests_data_with_strata

            return build_strata_pd

        strata = strata_generator(self.guests, self.stratification_params)
        strata_result = self.guests.groupBy("partion").apply(strata)

        return strata_result


    def build_split(self, 
                    guests_with_strata,
                    split: BaseSplitElement):

        def build_split_generator(stratification_params, 
                            split: BaseSplitElement):

            schema = T.StructType([T.StructField("split_group_sizes", T.StringType(), False),
                                T.StructField("split_number", T.IntegerType(), False),
                                T.StructField("customer_rk", T.LongType(), False),
                                T.StructField("is_control", T.IntegerType(), False)])

            @pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
            def build_split_pd(df: pd.DataFrame):
                map_group_names_to_sizes={
                    "control": split.control_group_size,
                    "target": split.target_group_size
                }

                stratification_params.map_group_names_to_sizes = map_group_names_to_sizes

                guests_groups = build_split(df, stratification_params)
                guests_groups = guests_groups.join(
                                pd.get_dummies(guests_groups["group_name"])
                                .add_prefix("is_")
                )
                
                guests_groups["split_group_sizes"] = f"{split.control_group_size}_{split.target_group_size}"
                guests_groups["split_number"] = split.split_number
                            
                result = pd.DataFrame({"split_group_sizes": guests_groups["split_group_sizes"],
                                       "split_number": guests_groups["split_number"],
                                       "customer_rk": guests_groups["customer_rk"],
                                       "is_control": guests_groups["is_control"]
                                     })

                return result
            return build_split_pd

        split = build_split_generator(self.stratification_params,
                                      split)
        split_result = guests_with_strata.groupBy("partion").apply(split)

        return split_result

    def collect(self):
        """Calculate multiple split with stratification

        Returns: DataFrame with split column

        """
        schema = T.StructType([T.StructField("split_group_sizes", T.StringType(), False),
                            T.StructField("split_number", T.IntegerType(), False),
                            T.StructField("customer_rk", T.LongType(), False),
                            T.StructField("is_control", T.IntegerType(), False)])

        result = self.spark.createDataFrame([], schema)
        data_with_strata = self.apply_strata()

        for split_param in self.split_grid:
            split_data = self.build_split(data_with_strata, split_param)
            result = result.unionAll(split_data)
        
        return result


In [3]:
import datetime
from prepilot.params import PrepilotParams

datestart=datetime.date(2021, 4, 20)
datepostperiod=datetime.date(2021, 4, 28)
prepilot_params = PrepilotParams(
    datestart=datestart,
    datepostperiod=datepostperiod,
    metrics_names=['rto_novat'],
    injects=[1.01, 1.02, 1.03, 1.04, 1.05, 1.06, 1.07, 1.08, 1.09, 1.1,1.15,1.2,1.25],
    min_group_size=1000, 
    max_group_size=3000, 
    step=1000,
    iterations_number = 3,
    max_beta_score=0.2,
    min_beta_score=0.05,
    experiment_plu_codes=[''],
    experiment_synth_catalog_ids=[-1]
)

In [4]:
split_builder_params = SplitBuilderParams(
    map_group_names_to_sizes={
        'control': 10000,
        'target': 10000
    },
    cols=[
        'region',
        'rto',
        #'fm',
        #'points_balance',
        #'exp_points_balance',
        #'offer_rk_goal',
        #'offer_rk_campaign',
        #'rto_synthetic_cat',
        #'fm_synthetic_cat',
        #'count_checks_synthetic_cat'
    ],
    cat_cols=[
        #'offer_rk_goal',
        #'offer_rk_campaign'
    ],
    pvalue=0.05,
    n_top_cat=100,
    stat_test="ttest_ind"
)

In [5]:
import sys
import os
import copy
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sys.path.append(os.path.dirname(os.path.abspath('')))

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("gbc_ab_pyspark")
         # Add postgres jar
         #.config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/19 12:40:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
df = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema",True)
    .option("sep", ";")
    .load("TLO.csv")
)

                                                                                

In [7]:
df = (df.withColumnRenamed("customer_id","customer_rk")
        .withColumnRenamed("moda_city","region")
        .withColumnRenamed("post_count_orders","rto")
)

In [8]:
prepilot_guests_collector = PrepilotSplitBuilder(spark, df,
                                                [(10000,10000), (20000,20000),(30000,30000)],
                                                split_builder_params,
                                                5)

In [9]:
splited_df = prepilot_guests_collector.collect()



In [10]:
splited_df.count()

ERROR:root:KeyboardInterrupt while sending command.                 (0 + 1) / 1]
Traceback (most recent call last):
  File "/Users/egorshishkovets/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/egorshishkovets/opt/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/egorshishkovets/opt/anaconda3/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

                                                                                

In [10]:
import pandas as pd
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import pandas_udf
from stratification.params import SplitBuilderParams
from prepilot.abstract_experiment_builder import AbstractExperimentBuilder
from prepilot.params import PrepilotParams
from post_analysis.stat_test import PeriodStatTest


_ERROR_RES_SCHEMA = T.StructType([
                    T.StructField("group_sizes", T.StringType(), False),
                    T.StructField("metric", T.StringType(), False),
                    T.StructField("MDE", T.FloatType(), False),
                    T.StructField("is_effect_found", T.IntegerType(), False)
                    ])

class PrepilotExperimentBuilder(AbstractExperimentBuilder):
    """Calculates I and II type errors for different group sizes and injects
    """

    def __init__(self,
                 spark,
                 guests: DataFrame,
                 experiment_params: PrepilotParams):
        """
        Args:
            guests: dataframe that collected by PrepilotGuestsCollector
            experiment_params: prameters for prepilot experiments

        """
        super().__init__(spark, 
                         guests,
                         experiment_params)
        self._number_of_decimals = 10

    def _calc_experiment(self,
                         guests_with_splits,
                         res_schema,
                         metric_name,
                         inject=1.0):

        def calc_split_experiment_generator(stat_test_params, 
                                            metric_name, 
                                            inject=1.0):

            @pandas_udf(res_schema, F.PandasUDFType.GROUPED_MAP)
            def calc_split_experiment_pd(guests_with_splits):
                """Calculates stat test for one experiment grid element

                Args:
                    guests_with_splits: dataframe with calculated splits for experiment

                Returns: pandas DataFrame with calculated stat test and experiment parameters

                """
                group_sizes = guests_with_splits.split_group_sizes.values[0]
                control = guests_with_splits[guests_with_splits[metric_name] == 0][metric_name].values
                target = guests_with_splits[guests_with_splits[metric_name] == 1][metric_name].values * inject
                stat_test = PeriodStatTest(target, control, "", stat_test_params)
                stat_result = stat_test.calculate_period_effect()
                return pd.DataFrame({"group_sizes": [group_sizes],
                                    "metric": [metric_name],
                                    "MDE": [inject],
                                    "is_effect_found": stat_result["effect__significance"]
                                    })

            return calc_split_experiment_pd

        calculate_experiment = calc_split_experiment_generator(self.stat_test_params, metric_name, inject)
        result_df = guests_with_splits.groupBy(["split_group_sizes", "split_number" ]).apply(calculate_experiment)
        return result_df    

    def calc_alpha_error(self,
                         guests_with_splits,
                         res_schema
                         ):

        alpha_error_result = self.spark.createDataFrame([], res_schema)

        for metric_name in self._experiment_params.metrics_names:
            alpha_error_df = self._calc_experiment(guests_with_splits,
                                                   res_schema, 
                                                   metric_name)
            alpha_error_result = alpha_error_result.unionAll(alpha_error_df)
        
        alpha_agg = (alpha_error_result
            .groupBy(["metric", "group_sizes"])
            .agg(F.avg("is_effect_found").alias("alpha_error"))
        )
        return alpha_agg.groupBy("metric").pivot("group_sizes").sum("alpha_error")

    def calc_beta_error(self,
                        guests_with_splits,
                        res_schema
                        ):

        beta_error_result = self.spark.createDataFrame([], res_schema)
        for inject in self._experiment_params.injects:
            for metric_name in self._experiment_params.metrics_names:
                beta_error_df = self._calc_experiment(guests_with_splits,
                                                      res_schema,
                                                      metric_name,
                                                      inject)
                beta_error_result = beta_error_result.unionAll(beta_error_df)
        
        beta_agg = (beta_error_result
            .groupBy(["metric", "group_sizes", "MDE"])
            .agg(1.0 - F.avg("is_effect_found").alias("beta"))
        )

        return beta_agg.groupBy(["metric", "MDE"]).pivot("group_sizes").sum("beta")


    def collect(self, stratification_params: SplitBuilderParams) -> pd.DataFrame:
        """Calculates I and II types error using prepilot parameters.

        Args:
            stratification_params: params for stratification
            full: if True function will return full dataframe with results.
            Otherwise will be returned only max calculated MDE for each size.

        Returns: pandas DataFrames with aggregated results of experiment.

        """
        prepilot_split_builder = PrepilotSplitBuilder(self.spark, 
                                                        self.guests,
                                                        self.group_sizes,
                                                        stratification_params,
                                                        3)#self._experiment_params.iterations_number)

        prepilot_guests = prepilot_split_builder.collect()
        prepilot_guests = (prepilot_guests.join(self.guests, 
                                                on="customer_rk", 
                                                how="inner")
                        .select(prepilot_guests.columns+self._experiment_params.metrics_names)
        )

        beta = self.calc_beta_error(prepilot_guests, _ERROR_RES_SCHEMA)
        alpha = self.calc_alpha_error(prepilot_guests, _ERROR_RES_SCHEMA)
        return beta, alpha


In [11]:
datestart=datetime.date(2021, 4, 20)
datepostperiod=datetime.date(2021, 4, 28)
prepilot_params = PrepilotParams(
    datestart=datestart,
    datepostperiod=datepostperiod,
    metrics_names=['rto'],
    injects=[1.01, 1.02, 1.03],
    min_group_size=10000, 
    max_group_size=30000, 
    step=10000,
    iterations_number = 30,
    max_beta_score=0.2,
    min_beta_score=0.05,
    experiment_plu_codes=[''],
    experiment_synth_catalog_ids=[-1]
)

In [12]:
prepilot = PrepilotExperimentBuilder(spark, df,
                                     prepilot_params)

In [13]:
beta, alpha = prepilot.collect(split_builder_params)

                                                                                

In [14]:
beta.show(10)



+------+----+-----------+-----------+-----------+
|metric| MDE|10000_10000|20000_20000|30000_30000|
+------+----+-----------+-----------+-----------+
|   rto|1.01|        1.0|        1.0|        1.0|
|   rto|1.02|        1.0|        1.0|        1.0|
|   rto|1.03|        1.0|        1.0|        1.0|
+------+----+-----------+-----------+-----------+



                                                                                

In [15]:
alpha.show(10)



+------+-----------+-----------+-----------+
|metric|10000_10000|20000_20000|30000_30000|
+------+-----------+-----------+-----------+
|   rto|        1.0|        1.0|        1.0|
+------+-----------+-----------+-----------+



                                                                                

In [124]:
beta.show(1)

[Stage 34:>                                                         (0 + 1) / 1]

+-----------+------+---+---------------+
|group_sizes|metric|MDE|is_effect_found|
+-----------+------+---+---------------+
|10000_10000|   rto|1.0|              1|
+-----------+------+---+---------------+
only showing top 1 row



                                                                                

In [20]:
t = (splited_df.groupBy(["split_group_sizes"])
            .agg(1.0 - F.avg("is_control").alias("beta")))

In [21]:
t.show(10)



+-----------------+-------------------------------+
|split_group_sizes|(1.0 - avg(is_control) AS beta)|
+-----------------+-------------------------------+
|      10000_10000|            0.49992495121829184|
|      20000_20000|                            0.5|
|      30000_30000|             0.5000083320835208|
+-----------------+-------------------------------+



                                                                                

In [22]:
t1 = (splited_df.groupBy(["split_group_sizes"])
            .agg(F.avg("is_control").alias("beta")))

In [23]:
t1.show(10)



+-----------------+------------------+
|split_group_sizes|              beta|
+-----------------+------------------+
|      10000_10000|0.5000750487817082|
|      20000_20000|               0.5|
|      30000_30000|0.4999916679164792|
+-----------------+------------------+



                                                                                