In [1]:
import json
import pandas as pd
import numpy as np
from fractions import Fraction


# Input JSON File

In [2]:
# Note, all index starts from 1
# fixed parameters
balls = 10
name_cols = ['B','W','P','G']
name_urns = ['A','B','C','D']

# Experimental Design: Ball compositions and define the maximum number of urns = 4 and maximum number of colours = 4
ball_comp = {(2,2):{'p_col1_u1':0.7, 'p_col2_u1':0.3, 'p_col1_u2':0.3, 'p_col2_u2':0.7}, 
             (2,3):{'p_col1_u1':0.7, 'p_col2_u1':0.2, 'p_col3_u1':0.1, 'p_col1_u2':0.2, 'p_col2_u2':0.1, 'p_col3_u2':0.7},
             (2,4):{'p_col1_u1':0.4, 'p_col2_u1':0.3, 'p_col3_u1':0.2, 'p_col4_u1':0.1, 'p_col1_u2':0.1, 'p_col2_u2':0.2, 'p_col3_u2':0.3, 'p_col4_u2':0.4},
             (3,2):{'p_col1_u1':0.7, 'p_col2_u1':0.3, 'p_col1_u2':0.5, 'p_col2_u2':0.5, 'p_col1_u3':0.3, 'p_col2_u3':0.7},
             (3,3):{'p_col1_u1':0.7, 'p_col2_u1':0.2, 'p_col3_u1':0.1, 'p_col1_u2':0.1, 'p_col2_u2':0.7, 'p_col3_u2':0.2, 'p_col1_u3':0.2, 'p_col2_u3':0.1, 'p_col3_u3':0.7},
             (3,4):{'p_col1_u1':0.4, 'p_col2_u1':0.3, 'p_col3_u1':0.2, 'p_col4_u1':0.1, 'p_col1_u2':0.3, 'p_col2_u2':0.2, 'p_col3_u2':0.2, 'p_col4_u2':0.3, 'p_col1_u3':0.1, 'p_col2_u3':0.2, 'p_col3_u3':0.3, 'p_col4_u3':0.4},
             (4,2):{'p_col1_u1':0.7, 'p_col2_u1':0.3, 'p_col1_u2':0.6, 'p_col2_u2':0.4, 'p_col1_u3':0.4, 'p_col2_u3':0.6, 'p_col1_u4':0.3, 'p_col2_u4':0.7},
             (4,3):{'p_col1_u1':0.7, 'p_col2_u1':0.2, 'p_col3_u1':0.1, 'p_col1_u2':0.1, 'p_col2_u2':0.7, 'p_col3_u2':0.2, 'p_col1_u3':0.2, 'p_col2_u3':0.1, 'p_col3_u3':0.7, 'p_col1_u4':0.3, 'p_col2_u4':0.4, 'p_col3_u4':0.3},
             (4,4):{'p_col1_u1':0.4, 'p_col2_u1':0.3, 'p_col3_u1':0.2, 'p_col4_u1':0.1, 'p_col1_u2':0.3, 'p_col2_u2':0.4, 'p_col3_u2':0.1, 'p_col4_u2':0.2, 'p_col1_u3':0.2, 'p_col2_u3':0.1, 'p_col3_u3':0.4, 'p_col4_u3':0.3, 'p_col1_u4':0.1, 'p_col2_u4':0.2, 'p_col3_u4':0.3, 'p_col4_u4':0.4}
}

# Floating parameters
max_urns = 4
max_colours = 4
max_draws = 3
num_trials_per_comp = 2


In [3]:
def simulate_ball_draws( k, j, n, ball_comp, name_cols, name_urns):
    """
    Simulate a sequence of ball draws.

    Parameters:
        n (int): Length of the sequence.
        ball composition: a dict of dict. keys of first dict is number of urns and number of colours, the second key is p_colj_uk
        total number of urns k, 
        total number of colours j,

    Returns:
        list: Simulated sequence of ball draws.
    """

    # generate urn number list
    urns = name_urns[:k]
    # generate colour list
    colors = name_cols[:j]

    # randomly choose a urn to draw n balls with replacement
    chosen_urn = np.random.choice(urns)
    chosen_urn_index = urns.index(chosen_urn)

    # grab relevant experimental design base on k and j
    probabilities_dict = ball_comp[(k, j)]
    # get probability list for np.random.choice from experiment design
    # first element is p_col1_chosenUrn, p_col2_chosenUrn...
    probabilities = []
    for col in range(1,j+1):
        probabilities.append(probabilities_dict[f"p_col{col}_u{chosen_urn_index+1}"])
    return (chosen_urn, np.random.choice(colors, size=n, p=probabilities))

In [4]:
def create_urnTable_json(trial, urns, colours, max_draws, balls, ball_comp, name_cols, name_urns):
    """
    Create a JSON file with the urn composition for a given number of urns and colours
    Uninformative prior is used

    trial: trial number or trial name (string)
    urns: number of urns
    colours: number of colours
    balls: number of balls
    ball_comp: dictionary with the ball composition
    name_cols: list with the names of the colours
    name_urns: list with the names of the urns
    """

    # covert decimal prior to fraction
    prior = str(Fraction(1, urns))

    # Create a dataframe from the ball composition dictionary
    ball_comp_df = pd.DataFrame(ball_comp[(urns,colours)], index=[0])
    ball_comp_freq = ball_comp_df * balls

    # urn_lis is a list of urn compositions
    urn_lis = []
    for i in range(urns):
        lis =[]
        for j in range(colours):
            lis.append(str(int(ball_comp_freq['p_col'+str(j+1)+'_u'+str(i+1)][0])) + name_cols[j])
        urn_lis.append(lis)

    # Create a list of dictionaries, each dictionary represents an urn
    urn_entries = []
    for i in range(urns):
        urn_dict = {}
        urn_dict['urnName'] = name_urns[i]
        urn_dict['prior'] = prior
        urn_dict['composition'] = urn_lis[i]
        urn_dict['balls'] = balls
        urn_entries.append(urn_dict)

    # Create a dictionary for the total urn
    total_dict = {}
    total_dict['urnName'] = 'Total'
    total_dict['prior'] = '1'
    # calculate the total composition
    total_comp_lis = []
    for j in range(colours):
        total_balls_per_colour = 0
        name = name_cols[j]
        for i in range(urns):
            total_balls_per_colour += int(ball_comp_freq['p_col'+str(j+1)+'_u'+str(i+1)][0])
        total_comp_lis.append(str(total_balls_per_colour) + name)
    total_dict['composition'] = total_comp_lis
    total_dict['balls'] = balls * urns
    urn_entries.append(total_dict)

    # wrap the urn_entries in an object
    urn_entries_object = {
        f"urnInfo": urn_entries
    }

    chosen_urn, sequence = simulate_ball_draws(k=urns,j=colours,n=max_draws,ball_comp=ball_comp,name_cols=name_cols,name_urns=name_urns)
    urn_entries_object['chosenUrn'] = chosen_urn
    urn_entries_object['ballDraws'] = list(sequence)

    # Wrap the list in an object
    urn_entries_object = {
        f"BU{trial}": urn_entries_object
    }
    return urn_entries_object



In [5]:
# Create the main experiment trials
trials_lis = []
trial_num = 1

for urn in range(2,max_urns+1):
    for colour in range(2,max_colours+1):
        for _ in range(num_trials_per_comp):
            urnTable = create_urnTable_json(trial=trial_num,urns=urn, colours=colour, max_draws=max_draws, balls=balls, ball_comp=ball_comp, name_cols=name_cols, name_urns=name_urns)
            trials_lis.append(urnTable)
            trial_num += 1

In [6]:
# Manually create practice trial, so it have a different urn composition
ball_comp_Practice = {(2,2):{'p_col1_u1':0.6, 'p_col2_u1':0.4, 'p_col1_u2':0.4, 'p_col2_u2':0.6}}

urnTable_Practice = create_urnTable_json(trial="Practice",urns=2, colours=2, max_draws=max_draws, balls=balls, ball_comp=ball_comp_Practice, name_cols=name_cols, name_urns=name_urns)

trials_lis.append(urnTable_Practice)

In [7]:
# Convert the dictionary to a JSON string
urn_entries_json = json.dumps(trials_lis, indent=4)

# Write the JSON string to a file
filename = f'input.json'
with open(filename, 'w') as json_file:
    json_file.write(urn_entries_json)

# Output JSON File

In [8]:
def bayesian_Updating_Urns(num_urns, num_col, ball_draws, ball_comp = dict()):
    '''
    num_urns: number of urns in the experiment
    num_cols: number of ball colours in the experiment
    ball_draws: a list of ball draw event outcomes. e.g. ['col1','col2']
    ball_comp: a dictionary of ball compositions in each urn. Key: (num_urns, num_cols), Values: a dictionary with keys being name of the conditional probability (e.g.p_col1_u1 probability of colour = 1 conditional on urn = 1) values being conditional probability
    output: a rounded (2 decimal points) dictionary of posterior
    '''
    assert type(num_urns) == int, "num_urns must be integer"
    assert type(num_col) == int, "num_col must be integer"
    assert type(ball_draws) == list, "ball_draws must be a list of events"
    assert type(ball_comp) == dict, "ball_comp must be a pre-defined dictionary of ball compositions"

    # initialize priors
    # directly told participants so no basic operations needed in the first draw
    # In subsequent draws, priors equals to the calculated posterior, so no basic operations needed
    # However, a side note: space complexity is added since people need to remember and keep track
    
    prior_dict = {}
    for urn in range(1, num_urns+1):
        prior_dict[f"prior_u{urn}"] = 1/num_urns

    # get ball composition probabilities - Likelihood
    # num_urns * num_cols basic operation in the worst case when you need to keep track of all colours
    # num_urns basic operation in the best case when only 1 colours is drawn in the sequential sampling
    relevant_p = ball_comp[(num_urns,num_col)]

    all_posteriors = {}
    # Sequential Ball Draws using Bayesian Updating
    for seq in range(len(ball_draws)):
        # calculate/update evidence
        # ignore summation, (num_urns-1) * num_cols basic operations in the worst case
        # not ignoring summation, 2 * num_urns * num_col - 1 times calculations in the worst case
        # In the best case, you can count the balls and make 1 division (BO) per colour in the first round only, so that's in total n_cols - 1 BO
        evidence_dict = {}
        for col in range(1, num_col+1):
            sum = 0
            for urn in range(1, num_urns+1):
                sum += prior_dict[f"prior_u{urn}"] * relevant_p[f"p_col{col}_u{urn}"]

            evidence_dict[f"p_col{col}"] = sum

        # calculate posterior
        # 2 * (num_urns - 1) basic operation in the worst case since the conditional probability of all urns sum to 1 
        # if we count summation/substraction, basic operation is 2 * num_urns
        posterior_dict = {}
        for urn in range(1, num_urns+1):
            event =  ball_draws[seq]
            posterior_dict[f"posterior_u{urn}"] = (prior_dict[f"prior_u{urn}"] * relevant_p[f"p_{event}_u{urn}"]) / evidence_dict[f"p_{event}"]
            all_posteriors[f"posterior_u{urn}_draw{seq+1}"] = posterior_dict[f"posterior_u{urn}"]
            # Update the prior using posterior
            prior_dict[f"prior_u{urn}"] =  posterior_dict[f"posterior_u{urn}"]

    rounded_dict = {key: round(value, 2) for key, value in all_posteriors.items()}

    # calculate worst case complexity, NEED WORK!!!!!!!!!!!!!!!!!!! Later...
    O_basic_operations = num_urns * num_col + len(ball_draws) * ((num_urns-1) * num_col + 2 * (num_urns - 1)) 
    #print(f"The worst case complexity is {O_basic_operations} number of basic operations when number of urns = {num_urns}, number of colours = {num_col}, sequential sampling = {len(ball_draws)}")
    
    return rounded_dict

In [9]:
def ColGuess(num_urns, num_col, max_draws, urn_posteriors=dict(), ball_comp=dict()):
    '''
    Given the urn posteriors and urn composition, using weighted average method to guess the next ball draw colour
    Input: 
    num_urns: number of urns in the experiment
    num_cols: number of ball colours in the experiment
    max_draws: maximum number of draws in the experiment
    urn_posteriors: updated urn posteriors dictionary after each ball draw
    ball_comp: a dictionary of ball compositions in each urn. Key: (num_urns, num_cols), Values: a dictionary with keys being name of the conditional probability (e.g.p_col1_u1 probability of colour = 1 conditional on urn = 1) values being conditional probability
    
    Output: a dictionary of the guessed colour
    '''

    assert type(num_urns) == int, "num_urns must be integer"
    assert type(num_col) == int, "num_col must be integer"

    col_posteriors = {}
    # calculate the weighted average for each colour
    for draw in range(1, max_draws+1):
        for col in range(1, num_col+1):
            sum = 0
            for urn in range(1, num_urns+1):
                sum += urn_posteriors[f"posterior_u{urn}_draw{draw}"] * ball_comp[(num_urns,num_col)][f"p_col{col}_u{urn}"]
            
            col_posteriors[f"posterior_col{col}_draw{draw}"] = sum
    
    rounded_dict = {key: round(value, 2) for key, value in col_posteriors.items()}

    return rounded_dict

In [10]:
# read JSON input file
filename = 'input.json'
with open(filename) as json_file:
    data = json.load(json_file)

In [11]:
# calculate posteriors for all instances
all_posteriors_dict = {}
for instance in data:
    # key is 'BU1', value is a dictionary of urn info: urnName, prior, composition, balls, chosenUrn, ballDraws
    for key, value in instance.items():
        num_urns = len(value['urnInfo']) - 1
        num_col = len(value['urnInfo'][-1]['composition'])

        ball_draws = value['ballDraws']

        # Create a dictionary that maps each color name to "col1", "col2", etc.
        color_mapping = {color: f"col{i+1}" for i, color in enumerate(name_cols)}
        # Map each color in color_list to its corresponding label
        mapped_ball_draws = [color_mapping[color] for color in ball_draws]
    
        # calculate urn posteriors
        urn_posteriors = bayesian_Updating_Urns(num_urns, num_col, mapped_ball_draws, ball_comp)

        # calculate colour posteriors
        col_posteriors = ColGuess(num_urns, num_col, max_draws, urn_posteriors, ball_comp)

        # concatenate urn and colour posteriors dictionary
        concat_posteriors = {**urn_posteriors, **col_posteriors}

        # append to all_posteriors_dict
        all_posteriors_dict[key] = concat_posteriors

all_posteriors_dict

{'BU1': {'posterior_u1_draw1': 0.3,
  'posterior_u2_draw1': 0.7,
  'posterior_u1_draw2': 0.5,
  'posterior_u2_draw2': 0.5,
  'posterior_u1_draw3': 0.3,
  'posterior_u2_draw3': 0.7,
  'posterior_col1_draw1': 0.42,
  'posterior_col2_draw1': 0.58,
  'posterior_col1_draw2': 0.5,
  'posterior_col2_draw2': 0.5,
  'posterior_col1_draw3': 0.42,
  'posterior_col2_draw3': 0.58},
 'BU2': {'posterior_u1_draw1': 0.7,
  'posterior_u2_draw1': 0.3,
  'posterior_u1_draw2': 0.84,
  'posterior_u2_draw2': 0.16,
  'posterior_u1_draw3': 0.93,
  'posterior_u2_draw3': 0.07,
  'posterior_col1_draw1': 0.58,
  'posterior_col2_draw1': 0.42,
  'posterior_col1_draw2': 0.64,
  'posterior_col2_draw2': 0.36,
  'posterior_col1_draw3': 0.67,
  'posterior_col2_draw3': 0.33},
 'BU3': {'posterior_u1_draw1': 0.67,
  'posterior_u2_draw1': 0.33,
  'posterior_u1_draw2': 0.87,
  'posterior_u2_draw2': 0.12,
  'posterior_u1_draw3': 0.96,
  'posterior_u2_draw3': 0.04,
  'posterior_col1_draw1': 0.53,
  'posterior_col2_draw1': 0.17,

In [12]:
# Convert the dictionary to a JSON string
pos_json = json.dumps(all_posteriors_dict, indent=4)

# Write the JSON string to a file
filename = f'obj_posteriors.json'
with open(filename, 'w') as json_file:
    json_file.write(pos_json)