In [13]:
!pip3 install matplotlib==3.7.2
!pip3 install tqdm
!pip3 install plotly
!pip3 install pandas==1.3.4
!pip3 install pyarrow



In [14]:
from ST_Methods.obf import OBF

In [15]:
import os
import s3fs
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.window import *

spark = SparkSession.builder.getOrCreate()
s3 = s3fs.S3FileSystem(anon=False)

In [16]:
class WriteReader:
    def __init__(self, base_path):
        self.base_path = base_path

    def write_and_read(self, df, name):
        path = os.path.join(self.base_path, name)
        df.write.mode("Overwrite").format("parquet").save(path)
        df = spark.read.format("parquet").load(path)
        return df

    def read(self, name):
        path = os.path.join(self.base_path, name)
        df = spark.read.format("parquet").load(path)
        return df

def ls(path):
    display(dbutils.fs.ls(path))
def rm(path):
    dbutils.fs.rm(path, True)

path = "s3://pa-ai-datascience-storage-dev/users/ub/edgar.davtyan/Tasks/Research on AI Gen/Interleaving/PackageScripts/TestData/"

write_reader_parquet = WriteReader(path)
write_reader_delta   = WriteReader(path)

wr = write_reader_parquet.write_and_read
rr = write_reader_parquet.read

In [17]:
ai_apply_click_aa = rr("sample_data_apply_click_aa").select("device_id", "sid", "keyword", "time_hours", "photo_id")
ai_apply_click = rr("sample_data_apply_click")
sid=["sid", "keyword"]
grouping="time_hours"

[Stage 2:>                                                          (0 + 1) / 1]

In [None]:
OBF_tester = OBF(sid_columns=sid,
                        grouping_column=grouping,
                         treatment_variant="stable_diffusion_v22",
                         control_variant="stable_diffusion_v21",
                         item_id_column="photo_id",
                         variant_column="sd_version")

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [None]:
threshold = OBF_tester.calculate_threshold(number_of_measurements=45, iterations=100000)

In [None]:
OBF_tester.transform(ai_apply_click)

In [None]:
results = OBF_tester.run_test(O_i_threshold=threshold)

In [None]:
OBF_tester.visualize(results)

In [None]:
from pyspark.sql.types import TimestampType

threshold_data = spark.read.format("parquet").load(data_dir_1)\
    .select("sid", "keyword", "time_hours", "photo_id")

experiment_data = spark.read.format("parquet").load(data_dir_2)\
    .select("sid", "keyword", "time_hours", "photo_id", "variant")

sid=["sid", "keyword"]
grouping="time_hours"
control = "A"
treatment = "B"
item_id = "photo_id"
variant_column = "variant"

obf_test = OBF(sid_column=sid,
               grouping_columns=grouping,
               treatment_variant=treatment,
               control_variant=control,
               item_id_column=item_id,
               variant_column=variant_column)

threshold = obf_test.calculate_threshold(number_of_measurements=45,
                                         iterations=100000)

obf_test.transform(experiment_data)

results = obf_test.run_test(L_i_threshold=threshold)

obf_test.visualize(results)

Testing test functions

In [18]:
import unittest
from datetime import datetime

from ST_Methods.obf import OBF
from pyspark.sql import SparkSession

import pandas as pd
import numpy as np

spark = SparkSession.builder.getOrCreate()

In [19]:
def create_variants():
    # Fixing the random seed for reproducibility
    np.random.seed(42)
    # Parameters
    start_time = pd.Timestamp("2023-06-05 06:00:00")
    end_time = pd.Timestamp("2023-06-05 23:00:00")
    mean_rows_per_hour = 1150
    stddev_rows_per_hour = 240
    unique_sids = 2000  # Number of unique sid_1 and sid_2 identifiers
    p_A = 0.45  # Probability of selecting variant A

    # Generate consecutive hours
    hours = pd.date_range(start=start_time, end=end_time, freq='H')

    # Generate unique sid_1 and sid_2 identifiers
    sid_1 = np.random.randint(100, 1000, size=unique_sids)
    sid_2 = np.random.randint(1000, 2000, size=unique_sids)

    # Initialize DataFrame
    simulated_data = pd.DataFrame(columns=['hours', 'sid_1', 'sid_2', 'item_id', 'variant'])

    # Start item_id from a 15-digit number
    item_id_start = 10 ** 14

    # Generate data for each hour
    for hour in hours:
        # Generate number of rows for the current hour using a normal distribution
        num_rows = int(np.random.normal(loc=mean_rows_per_hour, scale=stddev_rows_per_hour))

        # Create DataFrame for the current hour
        current_data = pd.DataFrame({
            'hours': [hour] * num_rows,
            'sid_1': np.random.choice(sid_1, num_rows),
            'sid_2': np.random.choice(sid_2, num_rows),
            'item_id': np.arange(item_id_start, item_id_start + num_rows),
            'variant': np.random.choice(['A', 'B'], num_rows, p=[p_A, 1 - p_A])
        })

        # Append to the main DataFrame
        simulated_data = pd.concat([simulated_data, current_data], ignore_index=True)

        # Update item_id_start for uniqueness in the next iteration
        item_id_start += num_rows

    # Convert the Pandas DataFrame to a Spark DataFrame
    simulated_data_spark = spark.createDataFrame(simulated_data)

    return simulated_data_spark

In [20]:
OBF_tester = OBF(sid_columns=["sid_1", "sid_2"],
                 grouping_column="hours",
                 treatment_variant="B",
                 control_variant="A",
                 item_id_column="item_id",
                 variant_column="variant")

In [21]:
threshold = OBF_tester.calculate_threshold(number_of_measurements=45,
                                         iterations=100000)

100%|██████████| 100000/100000 [00:01<00:00, 80204.16it/s]


In [22]:
OBF_tester.transform(create_variants()).show()

View job details at https://picsart-dev.cloud.databricks.com/?o=0#/setting/clusters/0219-083901-krfv8a0u/sparkUi


[Stage 5:>                  (0 + 8) / 8][Stage 7:>                  (0 + 0) / 1]

+-----+-----+-------------------+-------+---------+-------+------------+-----+
|sid_1|sid_2|              hours|variant|tr_clicks|variant|cntrl_clicks|score|
+-----+-----+-------------------+-------+---------+-------+------------+-----+
|  100| 1045|2023-06-05 08:00:00|   null|        0|      A|           1|   -1|
|  100| 1052|2023-06-05 09:00:00|      B|        1|   null|           0|    1|
|  100| 1065|2023-06-05 11:00:00|   null|        0|      A|           1|   -1|
|  100| 1076|2023-06-05 21:00:00|      B|        1|   null|           0|    1|
|  100| 1086|2023-06-05 22:00:00|   null|        0|      A|           1|   -1|
|  100| 1131|2023-06-05 11:00:00|      B|        1|   null|           0|    1|
|  100| 1142|2023-06-05 22:00:00|      B|        1|   null|           0|    1|
|  100| 1143|2023-06-05 09:00:00|      B|        1|   null|           0|    1|
|  100| 1145|2023-06-05 20:00:00|      B|        1|   null|           0|    1|
|  100| 1146|2023-06-05 06:00:00|   null|        0| 

In [23]:
results = OBF_tester.run_test(O_i_threshold=threshold)

results

DataFrame[hours: timestamp, T_i_sngl: bigint, m_i_sngl: bigint, D_i: double, i: int, T_i: bigint, m_i: bigint, m_i_pow2: double, O_i: double, O_i*i: double, threshold: double, can_stop: boolean]

In [24]:
OBF_tester.visualize(results)

[Stage 20:>                                                         (0 + 1) / 1]

View job details at https://picsart-dev.cloud.databricks.com/?o=0#/setting/clusters/0219-083901-krfv8a0u/sparkUi


                                                                                

View job details at https://picsart-dev.cloud.databricks.com/?o=0#/setting/clusters/0219-083901-krfv8a0u/sparkUi
