In [1]:
import os
import sys
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import Window, functions as F, types as T

In [2]:
os.environ['PYSPARK_PYTHON']= sys.executable
spark = (
    SparkSession.builder
    .master("local[1]")
    .config("spark.executor.cores", "2")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

24/09/23 12:11:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
schema = T.StructType([
    T.StructField('camp_id', T.StringType()),
    T.StructField('control_group_flg', T.ShortType()),
    T.StructField('revenue', T.DoubleType()),
    T.StructField('margin', T.DoubleType()),
    T.StructField('conversion', T.ShortType()),
    T.StructField('user_id', T.StringType()),
])

statistics_df = spark.createDataFrame(pd.read_csv('data_sample.csv'))
statistics_df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+-------+-----------------+-------+------+----------+--------------------+
|camp_id|control_group_flg|revenue|margin|conversion|             user_id|
+-------+-----------------+-------+------+----------+--------------------+
|ED_3755|                0|   4.77|   5.3|         1|6f9be979ec9f4aba9...|
|ED_3755|                0|   7.53|  8.12|         1|6a0a4bdd10e670717...|
|ED_3755|                0|  12.33|  5.24|         1|2b96123b22c810b53...|
|ED_3755|                0|   5.44|  4.32|         1|06d37e5b5d6c45a9a...|
|ED_3755|                0|    0.0|   0.0|         0|86fcb416dd0d2b9e7...|
+-------+-----------------+-------+------+----------+--------------------+
only showing top 5 rows



                                                                                

In [4]:
# Список ключевых столбцов групп
KEY_COLS = ['camp_id', 'control_group_flg']
# Список метрик, по которым считаем статистику
STATISTICS_COLS = ['revenue', 'margin', 'conversion']
# Количество бутстрап операций
BS_ITERS = 10000
ALPHA = 0.05

In [5]:
# Рассчитываем количество строк в группе и порядковый номер
window_for_bs_stats = Window.partitionBy(*KEY_COLS)
statistics_df = (
    statistics_df
    .withColumn('rn',
                F.row_number().over(window_for_bs_stats.orderBy('user_id')))
    .withColumn('group_cnt', F.count(F.lit(1)).over(window_for_bs_stats))
    # repartition нужен для того, чтобы дальнейшие действия по разворачиванию происходили на всех экзекьюторах
    # а не ограниченном количестве, соответствующем количеству партиций из window_for_bs_stats
    .repartition('rn')
).cache()

In [6]:
bs_df = (
    statistics_df
    # Для каждой записи в нашем датафрейме генерируем количество строк = bs_iter_cnt
    # Порядковый номер строки будет являться номером итерации
    .select(KEY_COLS + ['group_cnt'])
    .withColumn('iter_num', F.explode(F.sequence(F.lit(1), F.lit(BS_ITERS))))
    # Для каждой строки в итерации случайно выбираем номер записи из изначального датафрейма
    .withColumn('rn', 
                (F.floor(F.rand() * (F.col('group_cnt'))) + 1).cast('int')
                .alias('rn'))
    .select(KEY_COLS + ['iter_num', 'rn'])
)

# Соединяем бутстрап датафрейм с изначальным по случайному идентификатору строки
result_df = (
    bs_df
    .join(
        statistics_df.select(KEY_COLS + STATISTICS_COLS + ['rn']),
        on=KEY_COLS + ['rn'],
        how='inner'
    )
    # Группируем по ключевым полям и номеру итерации
    .groupby(KEY_COLS + ['iter_num'])
    .agg(*[F.avg(stat_col).alias(stat_col) for stat_col in STATISTICS_COLS])
)

In [7]:
# Сохраняем результат и приводим к единому виду с остальными тетрадками
result = result_df.toPandas()
# Минусуем метрики контрольной группы, чтобы сложить можно было для получения разницы
result.loc[result['control_group_flg'] == 1, STATISTICS_COLS] *= -1
# Можно собирать и сразу в pyspark
result = (
    result
    .groupby(['camp_id', 'iter_num'], as_index=False).sum()
    .groupby('camp_id', as_index=False)[STATISTICS_COLS].quantile([ALPHA/2, 1-ALPHA/2])
    .groupby('camp_id', as_index=False).agg(list)
)

result

                                                                                

Unnamed: 0,camp_id,revenue,margin,conversion
0,ED_3755,"[0.16221133363306647, 1.2243871651754177]","[-0.8159217680834554, 0.2912118966302792]","[0.011798244376076017, 0.0679830485921541]"


In [8]:
spark.stop()