# Local settings

In [None]:
data_dir = 'raw_data/kana_colourpreferencequalia-master/data'

# Specify the data column to focus on throughout by default
data_column = 'responseTime' # 'response' or 'responseTime'

# Specify which type of responses to filter for
response_type = 'preference' # 'similarity' or 'preference'

# Import Packages

In [None]:
import os
import pickle as pkl
import sys
sys.path.append('/Users/kana/Library/Mobile Documents/com~apple~CloudDocs/Codes/GWOT_colorprefrencequalia')
import numpy as np
import pandas as pd
import sklearn
import csv
from scipy.stats import pearsonr, spearmanr
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from mpl_toolkits.mplot3d import Axes3D
from sklearn.manifold import MDS
import seaborn as sns
import ot
import plotly.graph_objs as go
import plotly.express as px
from mpl_toolkits.axes_grid1 import make_axes_locatable
import math

# Our own utility functions
import utilityFunctions

# Define variables/functions (used across the code)

In [None]:
# Define unique colors
# TODO - remove after confirming that it is no longer needed (replaced by function)
unique_colours = np.array(['#d2b700', '#db8b08', '#c7512c', '#c13547', '#a03663', '#753a7a', '#4b488e', '#005692', '#006a8b', '#007b75', '#008a52', '#9aa400'])
colour_index = {colour: idx for idx, colour in enumerate(unique_colours)}
matrix_size = len(unique_colours)

# Provides dictionary of colours, with an assigned id for each colour
# OUTPUTS:
#   colour_index: dictionary, {colour_string : colour_id}
def getUniqueColours():
    unique_colours = np.array(['#d2b700', '#db8b08', '#c7512c', '#c13547', '#a03663', '#753a7a', '#4b488e', '#005692', '#006a8b', '#007b75', '#008a52', '#9aa400'])
    colour_index = {colour: idx for idx, colour in enumerate(unique_colours)}
    return colour_index

### configuration
n_eps = 15 # number of epsilon values tried
eps_range = [0.04, 5] # the range of epsilon searched
epsilons = np.logspace(np.log10(eps_range[0]), np.log10(eps_range[1]), n_eps) # epsilon values

# Set default font size
plt.rcParams.update({'font.size': 14})

# Function(s) for loading and formatting data

In [None]:
# Function for getting data

# Extract data from datafiles in provided directory
#   Data is extracted in the colour x colour matrix form
# INPUTS:
#   data_dir: string, directory holding csv data files
#   target_data: string, target column name in data files which has the data you want to get
#   response_type: string, select data for 'similarity' or 'preference'
# OUTPUTS:
#   pMatrices:
#   pIds:
#   pFiles:
# TODO add parameter which allows for filtering of data based on specified column-value pairs
def getDataColourMatrix(data_dir, target_data, response_type):

    pFiles = [] # stores source datafile for each participant, pFiles[pID] gives the file for participant pID
    pMatrices = [] # stores data matrix for each participant

    pCounter = 0 
    for filename in utilityFunctions.sort_files_in_directory(data_dir):# sorted(os.listdir(data_dir), reverse=False):
        if filename.endswith(".csv"):
            filepath = os.path.join('raw_data/kana_colourpreferencequalia-master/data', filename)

            # This is another participant
            pFiles.append(filename)
            pCounter = pCounter + 1
            
            # Load the CSV file
            df = pd.read_csv(filepath)

            # Omit practice trials
            df = df[df['practice_trial'] != 1]

            # Filter for rows where 'response_type' is 'similarity' or 'preference'
            df_similarity = df[df['response_type'] == response_type]

            # Extract columns
            # colour1 is the left colour, colour2 is the right colour
            colour1 = df_similarity['colour1']
            colour2 = df_similarity['colour2']
            target_preference = df_similarity[target_data]

            """
            # Create and fill the matrix
            # NOTE here, double pass trials will overwrite the first trials (is this what we want?)
            colour_index = getUniqueColours()
            matrix_size = len(colour_index)
            matrix = np.zeros((matrix_size, matrix_size))
            for c1, c2, tp in zip(colour1, colour2, target_preference):
                I = colour_index[c1]
                j = colour_index[c2]
                matrix[j, I] = tp
            """

            # Create and fill the matrix
            # For double pass trials, take the average between the first and second pass
            colour_index = getUniqueColours()
            matrix_size = len(colour_index)
            matrix = np.zeros((matrix_size, matrix_size))
            matrix_entryCount = np.zeros((matrix_size, matrix_size))
            for c1, c2, tp in zip(colour1, colour2, target_preference):
                i = colour_index[c1] # rows correspond to left colour
                j = colour_index[c2] # cols correspond to right colour
                matrix[i, j] = matrix[i, j] + tp
                matrix_entryCount[i, j] = matrix_entryCount[i, j] + 1
            matrix = np.divide(matrix, matrix_entryCount)

            # Store the matrix
            pMatrices.append(matrix)

    pIds = range(0, pCounter)

    return pMatrices, pIds, pFiles

# Compute double pass correlations from datafiles in provided directory
# INPUTS:
#   data_dir: string, directory holding csv data files
#   response_type: string, specify to compute correlations for
#    'similarity' or 'preference' ratings
# OUTPUTS:
#   pCorrs: vector of double-pass correlations for each participant
def getDoublePassCorrelations(data_dir, response_type):
    
    pCorrs = []

    for filename in os.listdir(data_dir):
        if filename.endswith(".csv"):
            filepath = os.path.join(data_dir, filename)
            
            df = pd.read_csv(filepath, usecols=['response_type', 'response', 'colour1', 'colour2', 'trials.thisIndex', 'practice_trial'])

            # Omit practice trials
            df = df[df['practice_trial'] != 1]

            df_similarity = df[df['response_type'] == response_type]

            # Group by 'trials.thisIndex'
            grouped = df_similarity.groupby('trials.thisIndex')['response'].apply(list).reset_index()

            # Filter for pairs with exactly two trials
            repeated_pairs = grouped[grouped['response'].apply(len) == 2]

            # Check if there are any repeated pairs
            if len(repeated_pairs) > 0:
                # Flatten the responses for repeated pairs
                responses_trial1 = np.array(repeated_pairs['response'].apply(lambda x: x[0]).tolist())
                responses_trial2 = np.array(repeated_pairs['response'].apply(lambda x: x[1]).tolist())

                # Calculate overall correlation for repeated color pairs
                overall_correlation = pearsonr(responses_trial1, responses_trial2)[0]

                pCorrs.append(overall_correlation)

                #print(f"Overall {response_type} Pearson Correlation for {filename}: {overall_correlation}")
            else:
                print(f"No repeated pairs found in {filename}")

    return pCorrs

# Display multiple matrices as a subplot
#
# INPUTS:
#   vmin_val: number, minimum of colour scale
#   vmin_val: number, maximum of colour scale
#   matrices: list, list of 2D numpy matrices
#   rows: integer, how many subplot rows
#   cols: integer, how many subplot columns
#   titles: list, list of title strings for each subplot
#   cbar_label: string, title for colour bars
#   color_labels: dictionary, dictionary of colours and their ids (x/y axis position), {colour:id}
# OUTPUTS:
#   Returns nothing, just plots
def show_heatmaps(vmin_val, vmax_val, matrices, rows, cols, titles, cbar_label=None, color_labels=None):
    num_plots = len(matrices)
    grid_size = math.ceil(math.sqrt(num_plots))  # Determine the grid size
    fig, axs = plt.subplots(rows, cols, figsize=(16, 9)) #figsize=(5 * grid_size, 5 * grid_size))

    textsize = (14*6) * (1/cols) # scale font size based on number of panels

   # Flatten the axes array if it is 2D
    if isinstance(axs, np.ndarray):
        axs = axs.ravel()
    else:
        axs = [axs]

    for i, (matrix, title) in enumerate(zip(matrices, titles)):
        ax = axs[i]
        
        im = ax.imshow(matrix, aspect='auto', vmin=vmin_val, vmax=vmax_val)
        ax.set_title(title, fontsize=textsize)

        # Set axis labels
        ax.set_xlabel("Right")  # Label for x-axis
        ax.set_ylabel("Left")   # Label for y-axis
        ax.set_xticks([])
        ax.set_yticks([])

        # Shift the x and y labels to avoid overlap with colored labels
        ax.xaxis.set_label_coords(0.55, 0.025)  # Adjust xlabel position to account for colour labels
        ax.yaxis.set_label_coords(0.075, 0.55)  # Adjust ylabel position to account for colour labels

        ax.axis('square')

        if color_labels is not None:
            #ax.axis('off')
            for idx, color in enumerate(color_labels):
                utilityFunctions.add_colored_label(ax, -1.5, idx - 0.5, color, width=0.8)
                utilityFunctions.add_colored_label(ax, idx - 0.5, matrix.shape[1] - 0.2, color, height=0.8)

            ax.set_aspect('equal')
            ax.set_xlim(-3.0, matrix.shape[1])
            ax.set_ylim(-1, len(color_labels) + 1.4)
            ax.invert_yaxis()

            for spine in ax.spines.values():
                spine.set_visible(False)
        
        # Add colorbar
        divider = make_axes_locatable(ax)
        cax = divider.append_axes("right", size="5%", pad=0.05)
        cbar = fig.colorbar(im, cax=cax)
        cbar.set_label(cbar_label, fontsize=textsize)
        cbar.ax.tick_params(labelsize=textsize)

        # Adjust colorbar height to match the axis height dynamically
        ax_pos = ax.get_position()  # Get the subplot's position
        cax_pos = cax.get_position()  # Get the colorbar's position
        cax.set_position([cax_pos.x0, ax_pos.y0, cax_pos.width, ax_pos.height])  # Match height

        """
        # Adjust the height of the color bar
        position = cax.get_position()
        new_position = [position.x0, position.y0 + 0.1, position.width, position.height * 0.8]
        cax.set_position(new_position)
        """
    # Hide unused axes
    for ax in axs[num_plots:]:
        ax.axis('off')

    plt.tight_layout()
    plt.show()

# Load data

In [None]:
## Create nested dictionary "data" holding response matrices for each participant
# data['similarity' | 'preference']['response' | 'responseTime']

dataMatrices = dict()
dataParticipants = dict()
dataFiles = dict()
dataMatricesMean = dict() # Mean across participants

response_types = ['similarity', 'preference']
data_columns = ['response', 'responseTime']

for r in response_types:
    dataMatrices[r] = dict()
    dataParticipants[r] = dict()
    dataFiles[r] = dict()
    dataMatricesMean[r] = dict()
    for d in data_columns:
        pMatrices, pIds, pFiles = getDataColourMatrix(data_dir, d, r)
        dataMatrices[r][d] = pMatrices
        dataParticipants[r][d] = pIds
        dataFiles[r][d] = pFiles

        # Average across participants
        pMatrices_cat = np.stack(pMatrices, axis=2)
        dataMatricesMean[r][d] = np.mean(pMatrices_cat, axis=2)


## Compute double pass correlations for each participant and response type

pCorrs = dict()
for r in response_types:
    pCorrs[r] = getDoublePassCorrelations(data_dir, r)



# Show response matrices

In [None]:
# Get data
# Matrices, pIds, pFiles = getDataColourMatrix(data_dir, data_column, response_type)

response_type = 'preference' # 'similarity' or 'preference'
data_column = 'response' # 'response' or 'responseTime'

pMatrices = dataMatrices[response_type][data_column]
pIds = dataParticipants[response_type][data_column]
pFiles = dataFiles[response_type][data_column]

if data_column == 'responseTime':
    cLabel = 'response time (s)'
else:
    cLabel = response_type

# Show matrices
if data_column == 'response':
    # Range of raw response values
    colour_min = 0
    colour_max = 7
elif data_column == 'responseTime':
    # Range of response time values (0 is min)
    colour_min = 0
    colour_max = 5

show_heatmaps(colour_min, colour_max, pMatrices, 4, 6, titles=["subject" + str(p) for p in pIds], cbar_label= cLabel, color_labels=unique_colours)

for f in range(0, len(pFiles)):
    print("subject" + str(f) + ' ' + pFiles[f])


In [None]:
# Averaged response matrix

show_heatmaps(colour_min, colour_max, [dataMatricesMean['similarity'][data_column], dataMatricesMean['preference'][data_column]], 1, 2, titles=["mean " + r for r in response_types], cbar_label='rating', color_labels=unique_colours)


print(dataMatricesMean['similarity'][data_column])

In [None]:
# Distribution of responses per participant

num_plots = len(pMatrices)
grid_size = math.ceil(math.sqrt(num_plots))  # Determine the grid size
fig, axs = plt.subplots(4, 6, figsize=(16, 9))#, figsize=(5 * grid_size, 5 * grid_size))
text_fontsize = 14

# Flatten the axes array if it is 2D
if isinstance(axs, np.ndarray):
    axs = axs.ravel()
else:
    axs = [axs]

if data_column == 'responseTime':
    xlabel = 'responseTime (ms)'
else:
    xlabel = data_column

for p in range(0, len(pMatrices)):
    ax = axs[p]

    ax.hist(pMatrices[p].flatten(), bins=50, color='blue')
    ax.set_xlabel(xlabel, fontsize=text_fontsize)
    ax.set_ylabel('count', fontsize=text_fontsize)
    ax.set_title('subject' + str(p), fontsize=text_fontsize)

    ax.tick_params(axis='both', labelsize=text_fontsize)

# Hide unused axes
for ax in axs[num_plots:]:
    ax.axis('off')

plt.tight_layout()
plt.show()

In [None]:
# Plot mean response of each participant against variance

response_type = 'preference' # 'similarity' or 'preference'
data_column = 'responseTime' # 'response' or 'responseTime'

pMatrices = dataMatrices[response_type][data_column]

means = np.empty(len(pMatrices))
vars = np.empty(len(pMatrices))

for p in range(0, len(pMatrices)):
    pMat = pMatrices[p].flatten()

    # Take log of values if response time
    if data_column == 'responseTime':
        pMat = np.log(pMat)

    means[p] = np.mean(pMat)
    vars[p] = np.var(pMat)

plt.scatter(means, vars)
plt.xlabel(f'mean of {response_type} {data_column}s')
plt.ylabel(f'variance of {response_type} {data_column}s')
plt.title('mean vs variance of responses')

#f"Overall {response_type} Pearson Correlation for {filename}: {overall_correlation}"

In [None]:
ax = sns.jointplot(x=means, y=vars, kind='scatter', marginal_kws=dict(bins=20, fill=True), color='blue')
plt.xlabel(f'mean of {response_type} {data_column}s')
plt.ylabel(f'variance of {response_type} {data_column}s')

plt.suptitle('mean vs variance of responses', fontsize=16, y=1.05)

plt.axis('square')

# Double pass correlations

In [None]:
# Plot distribution of double pass correlations across participants

# response_type = 'similarity' # 'similarity' or 'preference'

plt.hist(pCorrs[response_type], color='blue')
plt.xlabel("r")
plt.ylabel("N")
plt.title('double pass - similarity')


In [None]:
# Plot doublePass(preference) against doublePass(similarity)


plt.scatter(pCorrs['similarity'], pCorrs['preference'])
plt.xlabel('r(sim)')
plt.ylabel('r(pref)')
plt.title('double pass - similarity vs preference')

#plt.axis('tight')
plt.xlim(-0.1, 1)
plt.ylim(-0.1, 1)

plt.plot([-1, 1], [-1, 1], 'k', linewidth=1)  # Black dashed line