In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import glob
import os

In [3]:
import sys

def get_n_dir_up(path, n):
    for _ in range(n):
        path = os.path.dirname(path)
    return path

CUR_PATH= os.path.abspath("__file__")
sys.path.append(os.path.join(get_n_dir_up(CUR_PATH, 2)))

In [4]:
import json

DEFAULT_DATA_FOLDER = os.path.join(
    get_n_dir_up(CUR_PATH, 3), 'data')
with open(os.path.join(DEFAULT_DATA_FOLDER, 'QA.json')) as f:
    qa_records = json.load(f)

invalid_subjs = qa_records['invalid']

In [5]:
DEFAULT_FEATURE_FOLDER = os.path.join(
    get_n_dir_up(CUR_PATH, 1), 'features')

def load_subject_time_feature(subj, time_steps, normalize=True):
    feature_loaded = []
    subj_result_folder = os.path.join(DEFAULT_FEATURE_FOLDER, f'{int(subj)}')
    subj_combined_feature_path = os.path.join(subj_result_folder, 'combined')
    for tid in time_steps:
        loaded = np.load(os.path.join(subj_combined_feature_path, f'{tid}.npy'))
        feature_loaded.append(loaded)
    features = np.mean(feature_loaded, axis=0) # time average

    if normalize:
        # Z-score normalization
        means = np.mean(features, axis=0)  # Mean of each column
        stds = np.std(features, axis=0)    # Standard deviation of each column
        features = (features - means) / (stds + 1e-5)
        
    # also the behavior data
    behavior_data = pd.read_csv(os.path.join(subj_result_folder, 'behavior.csv'))

    return features, behavior_data

In [6]:
all_subjs = os.listdir(DEFAULT_FEATURE_FOLDER)
all_subjs = [subj for subj in all_subjs if subj.isdigit()]
all_subjs = [subj for subj in all_subjs if int(subj) not in invalid_subjs]

### Firstly, get the binned patterns using training data

### convert degrees to-from bin id

In [10]:
from utils.distrib import color_smart_diff, color_smart_diff_outer
from scipy.spatial.distance import cdist

smart_diff = lambda x1, x2: color_smart_diff(x1, x2, vmin=-90, vmax=90)
smart_diff_outer = lambda x1, x2: color_smart_diff_outer(x1, x2, vmin=-90, vmax=90)

class DistFunctions:
    """ define all distance function """
    @classmethod
    def diff(cls, x1, x2, dist_name, pairwise):
        if dist_name == 'cos':
            return cls.cos_diff(x1, x2, pairwise)
        elif dist_name == 'rad':
            return cls.scaled_rad_diff(x1, x2, pairwise)
        elif dist_name == 'euc':
            return cls.euclidean_diff(x1, x2, pairwise)
        else:
            raise NotImplementedError(f'Unknown distance {dist_name}')

    @classmethod
    def cos_diff(cls, x1, x2, pairwise):
        x1 = x1 / np.linalg.norm(x1, axis=-1, keepdims=True)
        x2 = x2 / np.linalg.norm(x2, axis=-1, keepdims=True)
        if pairwise:
            dists = cdist(x1, x2, metric='cosine')
        else:
            similarity = np.sum(x1 * x2, axis=-1)
            dists = 1 - similarity
        return dists
    
    @classmethod
    def euclidean_diff(cls, x1, x2, pairwise):
        if pairwise:
            dists = cdist(x1, x2, metric='euclidean')
        else:
            diffs = x1 - x2
            dists = np.linalg.norm(diffs, axis=-1)
        return dists
    
    @classmethod
    def scaled_rad_diff(cls, x1, x2, pairwise):
        if pairwise:
            dists = np.abs(smart_diff_outer(x1, x2))
        else:
            dists = np.abs(smart_diff(x1, x2))
        dists = np.deg2rad(dists)
        return dists

In [None]:
from scipy.stats import zscore

def z_normalize(x):
    x = zscore(x, axis=0, nan_policy='omit')
    x = np.nan_to_num(x) # avoid nan
    return x

class ConfidenceBinConversion:
    def __init__(self, n_bins):
        self.n_bins = n_bins
        self.bins = np.linspace(0, 180, n_bins+1)
        self.bin_center = (self.bins[1:] + self.bins[:-1]) / 2

    def degree_to_bindiffs(self, degs):
        diffs = DistFunctions.diff(
            degs, self.bin_center, 
            dist_name='rad', pairwise=True)
        return diffs
    
    # convert degree to discrete labels
    def degree_to_binid(self, degs, dist_metric):
        diffs = self.degree_to_bindiffs(degs)
        bin_ids = np.argmin(diffs, axis=-1)
        return bin_ids
    
    def binid_to_degree(self, bin_ids):
        return self.bin_center[bin_ids]
    
    # convert to a distribution
    def softmax_dist_to_distrib(self, dists, k):
        distribs = np.exp(np.cos(dists) * k) # von-mises
        distribs /= np.sum(distribs, axis=-1, keepdims=True)
        return distribs
    
    def linear_dist_to_distrib(self, dists, k, offset=1e-2):
        distribs = (dists+offset) ** k
        distribs /= np.sum(distribs, axis=-1, keepdims=True)
        return distribs

In [None]:
def convert_ys_to_labels(multi_ys):
    # ys shape: N x k

def convert_ys_to_distribs(multi_ys):
    # convert 

In [None]:
import numpy as np
from sklearn.svm import SVR

def degress_to_cos_sin(degrees):
    # input: 0-180
    degrees = degrees % 180
    degrees = degrees * 2
    rads = np.deg2rad(degrees)
    c, s = np.cos(rads), np.sin(rads)
    return c, s

def cos_sin_to_degrees(cos_data, sin_data):
    rads = np.arctan2(sin_data, cos_data)
    degrees = np.rad2deg(rads)
    degrees = degrees % 360
    degrees = degrees / 2
    return degrees

def convert_stim_to_y(behav_df, stim_name, lmb):
    # masking
    stims = behav_df[stim_name].to_numpy()
    mask = lmb(behav_df) & (~(np.isnan(stims)))
    stims = stims[mask]
    return mask, stims

def prepare_continuous_regressor_xs_ys(subj, time_steps, stim_name, lmb, return_behav_df=False):
    features, behav_df = load_subject_time_feature(subj, time_steps)
    mask, ys = convert_stim_to_y(behav_df, stim_name, lmb)
    features = features[mask]

    if not return_behav_df:
        return features, ys
    else:
        behav_df = behav_df[mask]
        return features, ys, behav_df

def svr_trigonometric_regression(X_train, y_train, X_test):
    cos_train, sin_train = degress_to_cos_sin(y_train)

    # Train SVR for cosine and sine components
    # fit the model
    svr_cos = SVR(kernel='rbf')
    svr_sin = SVR(kernel='rbf')

    svr_cos.fit(X_train, cos_train)
    svr_sin.fit(X_train, sin_train)

    # test the model
    cos_pred = svr_cos.predict(X_test)
    sin_pred = svr_sin.predict(X_test)
    y_pred = cos_sin_to_degrees(cos_pred, sin_pred)

    return y_pred


### Next, for the test data, compute its difference to each 'pattern'

### convert diffs to confidence

### demixture: decode the two distributions from it.