In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

from matplotlib_inline.backend_inline import set_matplotlib_formats
set_matplotlib_formats('svg')

# set seaborn theme
sns.set_theme()

# set seaborn style
sns.set_style("whitegrid")

# set seaborn palette
sns.set_palette("tab10")

import glob
import json
import os

- Data of the control group are expected to be in `data/control/*.json`
- Data of the treatment group are expected to be in `data/*.json`

In [None]:
def load_trials(pattern='data/*.json'):
    """Load trials from json files into a pandas dataframe."""
    trials = []
    for path in glob.glob(pattern):
        with open(path) as f:
            trial = json.load(f)
            trial['filename'] = path
            trials.append(trial)

    if len(trials) == 0:
        raise ValueError('No trials found')
        
    df = pd.json_normalize(trials)

    # convert to datetime
    df['date'] = pd.to_datetime(df['date'])

    return df

Use a not too strict definition of success for now. 
We accept trials that have reached 'Served' and 'BowlWithCerelAndMilk' states since the task is not explicitly about serving the meal. 

In [None]:
def succeeded(trial):
    """Return True if trial succeeded."""
    return trial['analysis.final_state_name'] in ['Served', 'BowlWithCerealAndMilk']

# Load control data

In [None]:
control_df = load_trials(pattern='data/control/*.json')
control_df['succeeded'] = control_df.apply(succeeded, axis=1)
control_df

In [None]:
# plot histogram of reached accepting state - center bars
control_df['succeeded'].astype(float).hist(bins=2, align='mid', rwidth=0.5)
# false/true labels
plt.xticks([0.25, 0.75], ['false', 'true'])
plt.xlabel('reached accepting state')
plt.ylabel('count')

# Load new trials

In [None]:
treatment_df = load_trials(pattern='data/*.json')
treatment_df['succeeded'] = treatment_df.apply(succeeded, axis=1)
treatment_df

In [None]:
# plot histogram of reached accepting state - center bars
treatment_df['succeeded'].astype(float).hist(bins=2, align='mid', rwidth=0.5)
# false/true labels
plt.xticks([0.25, 0.75], ['false', 'true'])
plt.xlabel('reached accepting state')
plt.ylabel('count')


In [None]:
# scatterplot with analysis.tokens.prompt_tokens as x-axis and 
# analysis.tokens.completion_tokens as y-axis  
# analysis.reached_accepting_state as color - True: green, False: red
# analysis.tokens.total_tokens as size

sns.scatterplot(
    data=treatment_df,
    x='analysis.tokens.prompt_tokens',
    y='analysis.tokens.completion_tokens',
    hue='succeeded',
    size='analysis.tokens.total_tokens',
    sizes=(10, 100),
    alpha=0.8,
    palette=['red', 'green'],
)




In [None]:
# find the states the model reached
failed_trials_df = treatment_df[treatment_df['succeeded'] == False]
failed_trials_df

Let's explore one of the trials that didn't reach an accepting state.

In [None]:
# pick the first failed trial
failed_trial_filename = failed_trials_df.iloc[0]['filename']
failed_trial_filename

In [None]:
df_filename = treatment_df.copy()
df_filename.set_index('filename', inplace=True)

df_filename.loc[failed_trial_filename]

In [None]:
for evt in df_filename.loc[failed_trial_filename]['trace.events']:
    match evt['type']:
        case 'Start':
            print(f"==== [start] task: {evt['task']}")
        case 'End':
            print(f"==== [end] reason = {evt}")
        case 'ToolInvocationSucceeded':
            print(f"====[tool] tool = {evt['tool_name']}")
            print("==[tool] input = \n", "\n".join(evt['assistant_message']))
            print("==[tool] output = \n", "\n".join(evt['result']['output']))
        case 'ToolInvocationFailed':
            print(f"====[tool] tool = {evt['tool_name']}")
            print("==[tool] input = \n", "\n".join(evt['tool_input']))
            print("==[tool] error = \n", "\n".join(evt['error']))        
        case _:
            print(f"==== [other] {evt}")
       


# Bayesian A/B testing

Using https://www.pymc.io/projects/examples/en/latest/case_studies/bayesian_ab_testing_introduction.html

In [None]:
from dataclasses import dataclass
from typing import Dict, List, Union

import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pymc as pm

from scipy.stats import bernoulli, expon

In [None]:
RANDOM_SEED = 4000
rng = np.random.default_rng(RANDOM_SEED)

# %config InlineBackend.figure_format = 'retina'
az.style.use("arviz-darkgrid")

plotting_defaults = dict(
    bins=50,
    kind="hist",
    textsize=10,
)

In [None]:
@dataclass
class BetaPrior:
    alpha: float
    beta: float

In [None]:
@dataclass
class BinomialData:
    trials: int
    successes: int

In [None]:
class ConversionModelTwoVariant:
    def __init__(self, priors: BetaPrior):
        self.priors = priors

    def create_model(self, data: List[BinomialData]) -> pm.Model:
        trials = [d.trials for d in data]
        successes = [d.successes for d in data]
        with pm.Model() as model:
            p = pm.Beta("p", alpha=self.priors.alpha, beta=self.priors.beta, shape=2)
            obs = pm.Binomial("y", n=trials, p=p, shape=2, observed=successes)
            reluplift = pm.Deterministic("reluplift_b", p[1] / p[0] - 1)
        return model

In [None]:
def prepare_data(control_df, treatment_df):
    """Prepare data for the conversion model from the trials."""

    # dataframe with two columns: 'orig' and 'new'
    # each row contains the number of trials and successes for the variant
    data = pd.DataFrame(columns=['control', 'treatment'], dtype=int)

    # iterate over the trials
    control_ = control_df['succeeded'].astype(float).agg(['sum', 'count'])
    treatment_ = treatment_df['succeeded'].astype(float).agg(['sum', 'count'])

    data['control'] = [control_['count'], control_['sum']]
    data['treatment'] = [treatment_['count'], treatment_['sum']]

    # set the index
    data.index = ['trials', 'successes']

    return data

In [None]:
data = prepare_data(control_df, treatment_df)
data
    

In [None]:
def run_scenario_twovariant(
    raw_data: pd.DataFrame,
    weak_prior: BetaPrior,
    strong_prior: BetaPrior,
) -> None:
    variants = raw_data.columns
    assert len(variants) == 2
    
    data = [BinomialData(**raw_data[v].to_dict()) for v in variants]
    with ConversionModelTwoVariant(priors=weak_prior).create_model(data):
        trace_weak = pm.sample(draws=5000)
    with ConversionModelTwoVariant(priors=strong_prior).create_model(data):
        trace_strong = pm.sample(draws=5000)
    
    fig, axs = plt.subplots(2, 1, figsize=(7, 7), sharex=True)
    az.plot_posterior(trace_weak.posterior["reluplift_b"], ax=axs[0], **plotting_defaults)
    axs[0].set_title(f"{weak_prior}", fontsize=10)
    axs[0].axvline(x=0, color="red")
    az.plot_posterior(trace_strong.posterior["reluplift_b"], ax=axs[1], **plotting_defaults)
    axs[1].set_title(f"{strong_prior}", fontsize=10)
    axs[1].axvline(x=0, color="red")
    fig.suptitle(f"{variants[1]} vs. {variants[0]} Rel Uplift")
    return trace_weak, trace_strong

In [None]:
# # test with fake data
#
# weak_prior = ConversionModelTwoVariant(BetaPrior(alpha=100, beta=100))
# strong_prior = ConversionModelTwoVariant(BetaPrior(alpha=1000, beta=1000))

# with weak_prior.create_model(data=[BinomialData(1,1), BinomialData(1,1)]):
#     weak_prior_predictive = pm.sample_prior_predictive(samples=10000, return_inferencedata=False)

# with strong_prior.create_model(data=[BinomialData(1,1), BinomialData(1,1)]):
#     strong_prior_predictive = pm.sample_prior_predictive(samples=10000, return_inferencedata=False)


# fig, axs = plt.subplots(2, 1, figsize=(7, 7), sharex=True)
# az.plot_posterior(weak_prior_predictive["reluplift_b"], ax=axs[0], **plotting_defaults)
# axs[0].set_title(f"B vs. A Rel Uplift Prior Predictive, {weak_prior.priors}", fontsize=10)
# axs[0].axvline(x=0, color="red")
# az.plot_posterior(strong_prior_predictive["reluplift_b"], ax=axs[1], **plotting_defaults)
# axs[1].set_title(f"B vs. A Rel Uplift Prior Predictive, {strong_prior.priors}", fontsize=10)
# axs[1].axvline(x=0, color="red");

In [None]:
weak_prior = BetaPrior(alpha=100, beta=100)
strong_prior = BetaPrior(alpha=10000, beta=10000)
trace_weak, trace_strong = run_scenario_twovariant(data, 
                                                   weak_prior=weak_prior,
                                                   strong_prior=strong_prior)

$\textrm{relative\_uplift} = (\textrm{treatment} - \textrm{control}) / \textrm{control}$