# Sequence generation for the pseudoRWM set of experiments

## Import statements and utilities

In [1]:
import numpy as np
import matlab.engine

In [2]:
use_matlab = False  # use matlab to call createstimsequence?
if use_matlab:
    m = matlab.engine.start_matlab()
    m.addpath(m.genpath("C:/Gaia/CCN Lab/Utilities/sequences/pseudoRWM/"), nargout=0)
else:
    from numpy.matlib import repmat

def shuffle_along_axis(arr, axis):
    idx = np.random.rand(*arr.shape).argsort(axis=axis)
    return np.take_along_axis(arr,idx,axis=axis)

## Settings

In [3]:
exp_type = "pseudoRWM"  # choose from ["pseudoRWM", "pseudoRWMCtrl", "pseudoRWMConf", "pseudoRWMReps"]
num_conditions = 10 # number of different file sequences to generate (change if needed)

reps_dict = {"pseudoRWM": 12, "pseudoRWMCtrl": 12, "pseudoRWMConf": 11, "pseudoRWMReps": 11}
exact_reps =  True if exp_type in ["pseudoRWMConf", "pseudoRWMReps"] else False  # should the number of repetitions be exact or ok to exceed by 1?
reps = reps_dict[exp_type] # number of stimulus repetitions (after first presentation; change if needed)

to_dir = f"C:/Gaia/CCN Lab/Utilities/sequences/pseudoRWM/{exp_type}"  # where to store the sequences

## Main sequence creation function

In [30]:
class pseudoRWMSequenceMaker:

    def __init__(self, exp_type, num_reps, use_matlab=True, num_conditions=10, to_dir=""):

        # ================ SETTINGS ==========================================================================
        self.exp_type = exp_type
        self.num_reps = num_reps
        self.num_conditions = num_conditions
        self.to_dir = to_dir
        self.use_matlab = use_matlab

        self.num_keys = 3
        self.max_stims = 6

        # ================ BLOCKS ============================================================================
        # blocks: array of length n with each element representing the block's set size
        self.blocks = np.array([6, 6, 6, 6, 6, 6]) # 6 blocks: 3*type (Points vs Goals)
        self.num_blocks = len(self.blocks)

        # ================ TRIAL TYPES =======================================================================
        # trial_types: array of shape (num_conditions, num_blocks) where each element represent the sequence
        # of trial types (1 = Points, 0 = Goals) a participant will experience
        self.trial_types = np.tile(np.arange(self.num_blocks)%2, (self.num_conditions, 1))
        for s_i in range(self.num_conditions):
            for i in range(0, self.num_blocks//2, 2):
                np.random.shuffle(self.trial_types[s_i][i:i+2])  # shuffle elements within each group of 2

        # ================ RULES =============================================================================
        # R: columns represent keyboard keys
        # each column says how many stimuli will be associated with that key
        # e.g., 1, 2, 3 means 1 stimulus will be associated with key 0, 2, with key 1, 3 with key 2, 
        # for a total of 6 items
        self.R = np.vstack((
            np.array([1, 2, 3]),
            np.array([2, 2, 2]), 
            np.array([1, 2, 3]),
            np.array([2, 2, 2]), 
            np.array([1, 2, 3]),
            np.array([2, 2, 2])
        ))

        # block_rules: will contain dictionaries for each participant, with keys representing a block and 
        # variables representing a list of solutions for each stimulus
        self.block_rules = []  
        
        for s_i in range(self.num_conditions):
            # here we shuffle within rows (only the first three columns), so that it's not always the same keys 
            # that have 1, 2, or 3 stimuli associated with them (no difference for rows with 2, 2, 2) 
            R_i = shuffle_along_axis(self.R, axis=1)  # mix up stim/action within the rule
            block_rule_i = {}
            for block_i in range(self.num_blocks):
                # block_rule_i[block_i] = [key_i for key_i in range(self.num_keys) for _ in range(R_i[block_i, key_i])]
                block_rule_i[block_i] = {i: k for i, k in enumerate([key_i for key_i in range(self.num_keys) for _ in range(R_i[block_i, key_i])])}
            self.block_rules.append(block_rule_i)

        # ================ STIMULI ===========================================================================
        # stim_sets: stimulus sets (folders from where images will be taken for each block)
        self.stim_sets = []
        for s_i in range(self.num_conditions):
            self.stim_sets.append(np.random.permutation(self.num_blocks)+1)
        
        # block_stimuli: will contain dictionaries for each participant, with keys representing a block and 
        # variables as dictionaries with an image for each stimulus
        self.block_stimuli = []
        for s_i in range(self.num_conditions):
            block_stimuli_i = {}
            for block_i in range(self.num_blocks):
                block_stimuli_i[block_i] = {i: s for i, s in enumerate((np.random.permutation(self.max_stims)+1)[0:self.blocks[block_i]])}
            self.block_stimuli.append(block_stimuli_i)

        # block_seqprototypes: will contain dictionaries for each participant, with keys representing a set size and
        # variables as lists with a sequence of stimuli to be presented
        # create a prototype (corresponding to stimuli rather than stimulus images) for each set size
        self.block_seqprototypes = []
        
        # block_sequences: maps block_seqprototypes to corresponding stimulus sequences based on block_stimuli
        self.block_sequences = []
        
        for s_i in range(self.num_conditions):
            block_seqprototypes_i = {}
            block_sequences_i = {}
            # for block_i in range(self.num_blocks):
            for ns in np.unique(self.blocks):
                if self.use_matlab:
                    block_seqprototypes_i[ns] = np.squeeze(np.array(m.createstimsequence(m.double(int(self.num_reps)), m.double(int(ns))))).astype(int)
                else:
                    # worse (but faster) alternative if createstimsequence doesn't work
                    temp_seqprototype = repmat(np.arange(1, ns+1), 1, self.num_reps+1)[0]
                    np.random.shuffle(temp_seqprototype)
                    block_seqprototypes_i[ns] = temp_seqprototype
                self.block_seqprototypes.append(block_seqprototypes_i)
            # turn into stimuli
            for block_i, ns in enumerate(self.blocks):
                block_sequences_i[block_i] = np.vectorize((self.block_stimuli[s_i][block_i]).get)(self.block_seqprototypes[s_i][ns]-1)
            self.block_sequences.append(block_sequences_i)

        # ================ CSV FILES =========================================================================
        for s_i in range(self.num_conditions):
            # create csv
            # rows: stim, correct key, set size, blocks, img_folders, img_nums, trial_type, goal_img, nongoal_img
            goal_stim_count = 0
            for block_i in range(self.num_blocks):
                set_size = self.blocks[block_i]
                block_length = (reps + 1) * set_size  # number of trials in a block
                this_block = np.full((9, block_length), np.nan)

                _, unique_idx = np.unique(self.block_sequences[s_i][block_i], return_index=True)
                block_cond = self.trial_types[s_i][block_i]
                
                this_block[0] = self.block_seqprototypes[s_i][self.blocks[block_i]]  # stimulus number
                this_block[1] = np.vectorize((self.block_rules[s_i][block_i]).get)(self.block_seqprototypes[s_i][self.blocks[block_i]]-1) # correct key
                this_block[2] = np.repeat(set_size, block_length)  # set size
                this_block[3] = np.repeat(block_i+1, block_length)  # block number
                this_block[4] = np.repeat(self.stim_sets[s_i][block_i], block_length)  # image folder
                this_block[5] = self.block_sequences[s_i][block_i]  # stimulus number
                this_block[6] = np.repeat(block_cond, block_length)  # trial type

                if block_cond == 0:
                    # this_block[7] = goal_images[goal_stim_count:goal_stim_count+block_length]
                    # this_block[8] = nongoal_images[goal_stim_count:goal_stim_count+block_length]
                    goal_stim_count += block_length

                if block_i == 0:
                    train_seq = this_block
                    unique_stims = this_block[:, unique_idx]
                else:
                    train_seq = np.column_stack((train_seq, this_block))
                    unique_stims = np.column_stack((unique_stims, this_block[:, unique_idx]))
        print(train_seq.shape)

pseudoRWMSequenceMaker("pseudoRWMConf", num_reps=12, num_conditions=2, use_matlab=False)

(9, 468)


<__main__.pseudoRWMSequenceMaker at 0x1d580ec1070>

In [45]:
num_conditions = 2 # number of different file sequences to generate (change if needed)
reps = 12 # number of stimulus repetitions (after first presentation; change if needed)
to_dir = "C:/Gaia/CCN Lab/pseudoR Project/pseudoR Codes/pseudoRWM/sequences/"  # where to store the sequences


for s_i in range(num_conditions):
    
    blocks = np.array([6, 6, 6, 6, 6, 6, 6]) # 6 blocks: 3*type
    trial_types = []
    for tp in range(3):
        for tt in np.random.permutation(2):
            trial_types.append(tt)
    trial_types = np.array(trial_types)
    
    block_idx = np.hstack(((np.repeat(np.arange(1, len(blocks), 2), 2) + trial_types)))
    print(block_idx)
    blocks = blocks[block_idx]
    print(blocks)

    max_ns = np.max(blocks)
    
    # all_fracts = np.arange(1, 373)  # all fractal image numbers n for images with filename fract{n}.png
    all_fracts = np.arange(1, (np.sum(blocks[np.where(trial_types==0)])*2*(reps+1))+1)  # all fractal image numbers n 
    np.random.shuffle(all_fracts)
    goal_images = all_fracts[0:len(all_fracts)//2] 
    nongoal_images = all_fracts[len(all_fracts)//2:]

    # how many stimuli for which action (3 total) is correct + set size, organized by block
    # make sure that sometimes one action is totally incorrect (for ns 3)
    """
    R = np.vstack((
        np.array([1, 1, 0, 2]),
        np.array([0, 1, 1, 2]),
        np.array([2, 0, 1, 3]),
        np.array([1, 1, 1, 3]),
        np.array([1, 1, 1, 3]),
        np.array([1, 2, 1, 4]),
        np.array([1, 1, 2, 4]),
        np.array([1, 2, 2, 5]),
        np.array([2, 1, 2, 5]),
        np.array([2, 2, 1, 5]),
        np.array([2, 2, 2, 6]), 
        np.array([1, 2, 3, 6]
        )))
    """
    R = np.vstack((
        np.array([1, 2, 3, 6]),
        np.array([2, 2, 2, 6]), 
        np.array([1, 2, 3, 6]),
        np.array([2, 2, 2, 6]), 
        np.array([1, 2, 3, 6]),
        np.array([2, 2, 2, 6])
        ))

    R = np.column_stack((shuffle_along_axis(R[:, 0:3], axis=1), R[:, 3])) # mix up stim/action within the rule
    
    print("R", R)
    # order by permutated blocks
    realR = np.zeros_like(R)
    for ns in np.unique(blocks):
        T =  np.where(blocks==ns)
        realR[T] = R[np.where(R[:, 3]==ns)]
    R = realR
    print("realR", R)

    # rules
    rules = {}
    for b in range(len(blocks)):
        rules[b] = [a for a in range(3) for i in range(R[b, a])]
    print(rules)

[2 1 3 4 6 5]
[6 6 6 6 6 6]
R [[3 1 2 6]
 [2 2 2 6]
 [1 3 2 6]
 [2 2 2 6]
 [1 3 2 6]
 [2 2 2 6]]
realR [[3 1 2 6]
 [2 2 2 6]
 [1 3 2 6]
 [2 2 2 6]
 [1 3 2 6]
 [2 2 2 6]]
{0: [0, 0, 0, 1, 2, 2], 1: [0, 0, 1, 1, 2, 2], 2: [0, 1, 1, 1, 2, 2], 3: [0, 0, 1, 1, 2, 2], 4: [0, 1, 1, 1, 2, 2], 5: [0, 0, 1, 1, 2, 2]}
[1 2 3 4 6 5]
[6 6 6 6 6 6]
R [[2 1 3 6]
 [2 2 2 6]
 [1 2 3 6]
 [2 2 2 6]
 [1 2 3 6]
 [2 2 2 6]]
realR [[2 1 3 6]
 [2 2 2 6]
 [1 2 3 6]
 [2 2 2 6]
 [1 2 3 6]
 [2 2 2 6]]
{0: [0, 0, 1, 2, 2, 2], 1: [0, 0, 1, 1, 2, 2], 2: [0, 1, 1, 2, 2, 2], 3: [0, 0, 1, 1, 2, 2], 4: [0, 1, 1, 2, 2, 2], 5: [0, 0, 1, 1, 2, 2]}


In [38]:
pseudoRWMSequenceMaker("pseudoRWMConf", 11)

ValueError: operands could not be broadcast together with shapes (6,) (7,) 

In [None]:
'''
for s_i in range(num_conditions):

    trial_types = []
    for tp in range(3):
        for tt in np.random.permutation(2):
            trial_types.append(tt)
    trial_types = np.array(trial_types)
    block_idx = np.hstack(((np.repeat(np.arange(1, len(blocks), 2), 2) + trial_types)))
    blocks = blocks[block_idx]

    max_ns = np.max(blocks)

    all_fracts = np.arange(1, (np.sum(blocks[np.where(trial_types==0)])*2*(reps+1))+1)  # all fractal image numbers n 
    np.random.shuffle(all_fracts)
    goal_images = all_fracts[0:len(all_fracts)//2] 
    nongoal_images = all_fracts[len(all_fracts)//2:]

    # how many stimuli for which action (3 total) is correct + set size, organized by block
    # make sure that sometimes one action is totally incorrect (for ns 3)
    
    R = np.vstack((
        np.array([1, 2, 3, 6]),
        np.array([2, 2, 2, 6]), 
        np.array([1, 2, 3, 6]),
        np.array([2, 2, 2, 6]), 
        np.array([1, 2, 3, 6]),
        np.array([2, 2, 2, 6])
        ))

    R = np.column_stack((shuffle_along_axis(R[:, 0:3], axis=1), R[:, 3])) # mix up stim/action within the rule
    
    # order by permutated blocks
    realR = np.zeros_like(R)
    for ns in np.unique(blocks):
        T =  np.where(blocks==ns)
        realR[T] = R[np.where(R[:, 3]==ns)]
    R = realR

    # rules
    rules = {}
    for b in range(len(blocks)):
        rules[b] = [a for a in range(3) for i in range(R[b, a])]

    # stimulus sets (folders from where images will be taken for each block)
    stim_sets = np.random.permutation(len(blocks))+1
    
    # stimuli
    stimuli = {}
    for b in range(len(blocks)):
        stimuli[b] = (np.random.permutation(max_ns)+1)[0:blocks[b]]

    # stimulus sequences
    # create a prototype for each set size
    seqprototype = {}
    for ns in np.unique(blocks): # replaced with max ns variable
        if use_matlab:
            seqprototype[ns] = np.squeeze(np.array(m.createstimsequence(m.double(int(reps)), m.double(int(ns))))).astype(int)
        else:
            # worse alternative if createstimsequence doesn't work
            temp_seqprototype = repmat(np.arange(1, ns+1), 1, reps+1)
            np.random.shuffle(temp_seqprototype)
            seqprototype[ns] = temp_seqprototype

    # randomize which stimuli happen at which position
    stim_seqs = {}
    for b in range(len(blocks)):
        arr = np.random.permutation(blocks[b])+1
        stim_seqs[b] = np.squeeze(np.array([arr[i-1] for i in seqprototype[blocks[b]]]))

    # create csv
    # rows: stim, correct key, set size, blocks, img_folders, img_nums, trial_type, goal_img, nongoal_img
    goal_stim_count = 0
    for b in range(len(blocks)):
        set_size = blocks[b]
        block_length = (reps + 1) * set_size  # number of trials in a block
        this_block = np.full((9, block_length), np.nan)

        block_stims = stim_seqs[b]
        _, unique_idx = np.unique(block_stims, return_index=True)
        block_rules = rules[b]
        block_imgs = stimuli[b]
        block_cond = trial_types[b]
        
        this_block[0] = block_stims
        this_block[2] = np.repeat(set_size, block_length)  # set size
        this_block[3] = np.repeat(b+1, block_length)  # block number
        this_block[4] = np.repeat(stim_sets[b], block_length)  # image folder
        this_block[6] = np.repeat(block_cond, block_length)

        for st in range(set_size):
            this_block[1, np.where(block_stims==st+1)] = block_rules[st]  # correct key
            this_block[5, np.where(block_stims==st+1)] = block_imgs[st]  # stimulus image number
        
        if block_cond == 0:
            this_block[7] = goal_images[goal_stim_count:goal_stim_count+block_length]
            this_block[8] = nongoal_images[goal_stim_count:goal_stim_count+block_length]
            goal_stim_count += block_length

        if b == 0:
            train_seq = this_block
            unique_stims = this_block[:, unique_idx]
        else:
            train_seq = np.column_stack((train_seq, this_block))
            unique_stims = np.column_stack((unique_stims, this_block[:, unique_idx]))
    
    # For the last version of task, test sequence consisted of 3 subsequences which each contained all task stimuli 
    # in randomized order: testing presentation not counterbalanced like training seq
    test_seq = np.column_stack((unique_stims[:, np.random.permutation(unique_stims.shape[1])], 
                                unique_stims[:, np.random.permutation(unique_stims.shape[1])], 
                                unique_stims[:, np.random.permutation(unique_stims.shape[1])]))

    # checks
    # check that unique stims and metadata in test and train are the same
    short_train_seq = train_seq[0:6]
    short_test_seq = test_seq[0:6]
    unique_train, train_freq = np.unique(short_train_seq[:, short_train_seq[0].argsort()], return_counts=True, axis=1)
    unique_test, test_freq = np.unique(short_test_seq[:, short_test_seq[0].argsort()], return_counts=True, axis=1)
    same_stims = np.all(unique_train == unique_test)

    # check that each unique stim is repeated 13 times in train and 3 in test
    train_13 = np.all(np.isin(np.unique(train_freq), np.array([reps, reps+1, reps+2])))
    test_3 = np.all(np.unique(test_freq) == 3)

    # check that sequences are the correct length
    train_size_13 = train_seq.shape[1]//np.sum(blocks) == reps+1
    test_size_3 = test_seq.shape[1]//np.sum(blocks) == 3

    if same_stims and train_13 and test_3 and train_size_13 and test_size_3:
        np.savetxt(f"{to_dir}seq{si+1}_learning.csv", train_seq, delimiter=",")
        np.savetxt(f"{to_dir}seq{si+1}_testing.csv", test_seq, delimiter=",")
    else:
        if not same_stims:
            print(f"same_stims check failed for condition {si+1}")
        if not train_13:
            print(f"train_13 check failed for condition {si+1}")
        if not test_3:
            print(f"test_3 check failed for condition {si+1}")
        if not train_size_13:
            print(f"train_size_13 check failed for condition {si+1}")
        if not test_size_3:
            print(f"test_size_3 check failed for condition {si+1}")'''

## Quit MATLAB if needed

In [84]:
# quit matlab
if use_matlab:
    m.exit()