# Imports

In [68]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from itertools import permutations, product
from collections import defaultdict
from time import strftime, gmtime
import random
from random import sample

import string
import pysign.utils
from pysign.infos import location_movement

# Params

In [69]:
subject = 4
video_dir = "robot_stimuli"
cdir = os.getcwd()
subject_dir = f"{cdir}\\data\\subject_{subject}"

# Signs

In [70]:
def config_to_name(configuration):
    return f"{configuration['laterality']}_{configuration['localisation']}_{configuration['movement']}_{configuration['direction']}"

def name_to_config(fname):
    return dict(zip(['laterality', 'localisation', 'movement', 'direction'], fname.split('_')))

In [71]:
configurations = {
    'laterality' : ['L', 'R'],
    'localisation' : ['Head', 'Arm', 'Torso', 'Base'],
    'movement' : ['X', 'Y', 'Z'],
    'direction' : ['For', 'Back']
    }

all_configurations = list(product(*configurations.values()))

In [72]:
row_list = []

all_configurations = list(product(*configurations.values()))

for configuration in all_configurations:
    row = dict(zip(configurations.keys(), configuration))
    row_list.append(row)

movement_df = pd.DataFrame(row_list)

movement_df["fname"] = movement_df.apply(config_to_name, axis=1)

movement_df

Unnamed: 0,laterality,localisation,movement,direction,fname
0,L,Head,X,For,L_Head_X_For
1,L,Head,X,Back,L_Head_X_Back
2,L,Head,Y,For,L_Head_Y_For
3,L,Head,Y,Back,L_Head_Y_Back
4,L,Head,Z,For,L_Head_Z_For
5,L,Head,Z,Back,L_Head_Z_Back
6,L,Arm,X,For,L_Arm_X_For
7,L,Arm,X,Back,L_Arm_X_Back
8,L,Arm,Y,For,L_Arm_Y_For
9,L,Arm,Y,Back,L_Arm_Y_Back


# N-Back

## Params

In [73]:
#Global params
video_dir = "robot_stimuli"
subject = 4
n_values = [1,2,3]

#Main task
n_block = 3
target_number = 10
n_trial = target_number*5

#Training
training_block = 1
training_target_number = 2
training_trial = training_target_number*3

## Time estimation

In [74]:
#Global times
block_intro_duration = 60

#Letter times
letter_stimuli_duration = 2
letter_inter_stimuli = 1

#Robot times
robot_stimuli_duration = 3.5
robot_inter_stimuli = 1

letter_block_durations = (letter_stimuli_duration + letter_inter_stimuli)*n_trial
robot_block_durations = (robot_stimuli_duration + robot_inter_stimuli)*n_trial

print(f"A total of {n_block*n_trial} trials for each condition ({len(n_values)}) and each stimuli type (2)")
print(f"Each condition ({len(n_values)}) with letter stimuli last {strftime('%X', gmtime(letter_block_durations))}")
print(f"Each condition ({len(n_values)}) with robot stimuli last {strftime('%X', gmtime(robot_block_durations))}")

print(f"Estimated robot duration {strftime('%X', gmtime((robot_block_durations-block_intro_duration)*n_block*len(n_values)))} / {strftime('%X', gmtime(robot_block_durations*n_block*len(n_values)))}")
print(f"Estimated letter duration {strftime('%X', gmtime((letter_block_durations-block_intro_duration)*n_block*len(n_values)))} / {strftime('%X', gmtime(letter_block_durations*n_block*len(n_values)))}")

print(f"Estimated total duration {strftime('%X', gmtime((robot_block_durations+letter_block_durations-2*block_intro_duration)*n_block*len(n_values)))} / {strftime('%X', gmtime((robot_block_durations+letter_block_durations)*n_block*len(n_values)))}")

A total of 150 trials for each condition (3) and each stimuli type (2)
Each condition (3) with letter stimuli last 00:02:30
Each condition (3) with robot stimuli last 00:03:45
Estimated robot duration 00:24:45 / 00:33:45
Estimated letter duration 00:13:30 / 00:22:30
Estimated total duration 00:38:15 / 00:56:15


## Utils

In [75]:
def get_block_df(condition_df_list):
    condition_sequence =  [df['condition'].unique()[0] for df in condition_df_list]

    counter_dict = defaultdict(int)
    block_sequence = []
    for condition in condition_sequence:
        counter_dict[condition] += 1
        block_sequence.append(counter_dict[condition])
    
    for i, df in enumerate(condition_df_list):
        df['block'] = block_sequence[i]
        df['is_block_beginning'] = [True if i==0 else False for i in range(len(df))]

    return pd.concat(condition_df_list).reset_index(drop=True)

In [76]:
def is_target(sequence, n_value):
    return np.array([sequence[i-n_value]==sequence[i] and i-n_value>=0 for i in range(len(sequence))])

def n_back(sequence_len, target_number, n_value, items, seed=None):
    random.seed(seed)
    sequence = np.array(random.choices(items, k=sequence_len))
    idxs = sorted(random.choices(range(sequence_len-n_value), k=target_number))

    for idx in idxs:
        sequence[idx+n_value] = sequence[idx]

    is_target_idx = np.where(is_target(sequence, n_value))[0]

    while len(is_target_idx)!=target_number:
        if len(is_target_idx)>target_number:
            drop_idxs = random.choices(idxs, k=len(is_target_idx)-target_number)
            for idx in drop_idxs:
                sequence[idx]=random.choice(items)
        else:
            not_target_idx = np.where(is_target(sequence, n_value)==False)[0]
            not_target_idx = not_target_idx[not_target_idx<sequence_len-n_value]
            add_idxs = random.choices(not_target_idx, k=target_number-len(is_target_idx))
            for idx in add_idxs:
                sequence[idx+n_value]=sequence[idx]

        is_target_idx = np.where(is_target(sequence, n_value))[0]

    df = pd.DataFrame({
            'sequence':sequence, 
            'is_target':is_target(sequence, n_value),
            # 'theoric_target':[i-n_value in idxs for i in range(len(sequence))],
            'condition':n_value
            })

    return df

def analyze_sequence(sequence_df):
    n_value = sequence_df['condition'][0]
    sequence = sequence_df['sequence'].values
    sequence_df['is_lure'] = np.any((is_target(sequence, n_value-1), is_target(sequence, n_value+1)), axis=0)
    

    interpretor = {
    (False, False): 'none',
    (True, False):'target',
    (False, True):'lure',
    (True, True):'both'
    }

    summary = sequence_df.apply(lambda x: interpretor[(x['is_target'], x['is_lure'])], axis=1).value_counts()

    return summary


# sequence_df = n_back(n_trial, target_number, n_values[2], items=item_dict['robot'], seed=None)
# sequence_df
# final_df = analyze_sequence(sequence_df)
# final_df


## Computation

In [77]:
def get_conditions(subject, n_trial, target_number, n_values, n_block, stimuli_order, item_dict, video_dir, verbose=False):
    df_list = []

    for block in range(n_block):
        for stimuli_type in stimuli_order:
            for n_value in sample(n_values, len(n_values)):
                if verbose:
                    print(f"{n_value}-back, block {block}, {stimuli_type} stimuli")
                df = n_back(n_trial, target_number, n_value, items=item_dict[stimuli_type], seed=None)

                #Metadata
                df['stimuli'] = stimuli_type
                df['block'] = block+1
                df['is_block_beginning'] = [True if i==0 else False for i in range(len(df))]
                df['is_block_ending'] = [True if i==(n_trial-1) else False for i in range(len(df))]

                df_list.append(df)
                
                if verbose:
                    print(np.where(df['is_target'].values)[0])
                    print('-------------------------------------------------------------')
    
    nback_df = get_block_df(df_list)
    nback_df['fname'] = nback_df['sequence'].apply(lambda x : f"{video_dir}\\{x}.mp4" if len(x)>1 else f"{video_dir}\\\\filler.mp4")
    nback_df['subject'] = subject

    return nback_df

In [78]:
def get_training(subject, n_trial, target_number, n_values, n_block, stimuli_order, item_dict, video_dir, verbose=False):
    df = get_conditions(subject=subject, n_trial=n_trial, target_number=target_number, n_values=n_values, n_block=n_block, 
                                    stimuli_order=stimuli_order, item_dict=item_dict, video_dir=video_dir, verbose=verbose)
    
    df['stimuli'] = pd.Categorical(df['stimuli'], categories=['letter', 'robot'], ordered=True)
    df = df.sort_values(by=['stimuli', 'condition'])
    
    return df

In [79]:
item_dict = {
    'robot':movement_df['fname'],
    'letter':[elem for elem in string.ascii_uppercase if elem not in 'AEIOUY']
    }

stimuli_order = ['letter', 'robot']

In [80]:
nback_df = get_conditions(subject=subject, n_trial=n_trial, target_number=target_number, n_values=n_values, n_block=n_block, 
stimuli_order=stimuli_order, item_dict=item_dict, video_dir=video_dir, verbose=True)

2-back, block 0, letter stimuli
[ 4  5  6 19 21 23 24 26 40 49]
-------------------------------------------------------------
3-back, block 0, letter stimuli
[11 15 17 23 25 30 31 35 43 48]
-------------------------------------------------------------
1-back, block 0, letter stimuli
[ 8 11 12 14 18 26 28 31 36 38]
-------------------------------------------------------------
3-back, block 0, robot stimuli
[ 3  7 12 20 27 29 31 33 40 42]
-------------------------------------------------------------
2-back, block 0, robot stimuli
[ 8  9 13 14 19 29 35 36 42 45]
-------------------------------------------------------------
1-back, block 0, robot stimuli
[ 3  8 14 21 25 27 30 40 44 45]
-------------------------------------------------------------
2-back, block 1, letter stimuli
[ 3 10 13 20 27 31 32 33 34 36]
-------------------------------------------------------------
1-back, block 1, letter stimuli
[ 1  5  9 11 18 19 30 44 47 48]
---------------------------------------------------------

In [81]:
nback_training_df = get_training(subject=subject, n_trial=training_trial, target_number=training_target_number, n_values=n_values, n_block=training_block, 
stimuli_order=stimuli_order, item_dict=item_dict, video_dir=video_dir, verbose=True)

3-back, block 0, letter stimuli
[3 5]
-------------------------------------------------------------
1-back, block 0, letter stimuli
[3 4]
-------------------------------------------------------------
2-back, block 0, letter stimuli
[2 3]
-------------------------------------------------------------
1-back, block 0, robot stimuli
[1 4]
-------------------------------------------------------------
2-back, block 0, robot stimuli
[3 5]
-------------------------------------------------------------
3-back, block 0, robot stimuli
[4 5]
-------------------------------------------------------------


## Saving

In [82]:
def safe_save(df, save_dir, fname, index=False):
    fname_versions = [elem for elem in os.listdir(save_dir) if elem.split('old_')[-1]==fname]
    sorted_fname_versions = sorted(fname_versions, key=len, reverse=True)

    for target_name in sorted_fname_versions:
        os.rename(f"{save_dir}\\{target_name}", f"{save_dir}\\old_{target_name}")

    
    df.to_csv(f"{save_dir}\\{fname}", index=index)

In [83]:
os.makedirs(subject_dir, exist_ok = True)

In [15]:
fname = f"conditions_nback.csv"
safe_save(nback_df, subject_dir, fname, index=False)

In [16]:
fname = f"conditions_nback_short.csv"
safe_save(nback_training_df.head(training_trial), subject_dir, fname, index=False)

In [52]:
fname = f"conditions_nback_training.csv"
safe_save(nback_training_df, subject_dir, fname, index=False)

## Multi-subject

In [84]:
subjects = range(24)

mode = 'hard'

item_dict = {
    'robot':movement_df['fname'],
    'letter':[elem for elem in string.ascii_uppercase if elem not in 'AEIOUY']
    }

for subject in subjects:
    subject_dir = f"{cdir}\\data\\subject_{subject}"

    if subject%2==0:
        stimuli_order = ['letter', 'robot']

        nback_df = get_conditions(subject=subject, n_trial=n_trial, target_number=target_number, n_values=n_values, n_block=n_block, 
                                    stimuli_order=stimuli_order, item_dict=item_dict, video_dir=video_dir, verbose=False)
        nback_training_df = get_training(subject=subject, n_trial=training_trial, target_number=training_target_number, n_values=n_values, n_block=training_block, 
                            stimuli_order=stimuli_order, item_dict=item_dict, video_dir=video_dir, verbose=False)

    elif subject%2==1:
        stimuli_order = ['robot', 'letter']

        nback_df = get_conditions(subject=subject, n_trial=n_trial, target_number=target_number, n_values=n_values, n_block=n_block, 
                                    stimuli_order=stimuli_order, item_dict=item_dict, video_dir=video_dir, verbose=False)
        nback_training_df = get_training(subject=subject, n_trial=training_trial, target_number=training_target_number, n_values=n_values, n_block=training_block, 
                            stimuli_order=stimuli_order, item_dict=item_dict, video_dir=video_dir, verbose=False)

    os.makedirs(subject_dir, exist_ok = True)

    if mode == 'hard':
        nback_df.to_csv(f"{subject_dir}\\conditions_nback.csv", index=False)
        nback_training_df.head(training_trial).to_csv(f"{subject_dir}\\conditions_nback_short.csv", index=False)
        nback_training_df.to_csv(f"{subject_dir}\\conditions_nback_training.csv", index=False)
    
    elif mode == 'safe':
        safe_save(nback_df, subject_dir, "conditions_nback.csv", index=False)
        safe_save(nback_training_df.head(), subject_dir, "conditions_nback_short.csv", index=False)
        safe_save(nback_training_df, subject_dir, "conditions_nback_training.csv", index=False)
