# Saved Snippets
This script holds random snippets of code that I may need again, but are cluttering the main scripts.

## Custom PLSRW

In [None]:
from scipy.linalg import pinv2
from sklearn.metrics import r2_score

In [None]:
def _center_scale_xy(X, Y, scale=True):
    """ Center X, Y and scale if the scale parameter==True

    Returns
    -------
        X, Y, x_mean, y_mean, x_std, y_std
    """
    # center
    x_mean = X.mean(axis=0)
    X -= x_mean
    y_mean = Y.mean(axis=0)
    Y -= y_mean
    # scale
    if scale:
        x_std = X.std(axis=0, ddof=1)
        x_std[x_std == 0.0] = 1.0
        X /= x_std
        y_std = Y.std(axis=0, ddof=1)
        y_std[y_std == 0.0] = 1.0
        Y /= y_std
    else:
        x_std = np.ones(X.shape[1])
        y_std = np.ones(Y.shape[1])
    return X, Y, x_mean, y_mean, x_std, y_std


class PLSRW():
    
    def __init__(self, n_components=2, scale=True, reg=0.01):
        self.n_components=n_components
        self.scale = scale
        self.reg = reg
    
    def _calc_dist(self, X, Y):
        dist = []
        
        for feature in range(X.shape[1]):
            feature_dist = np.linalg.norm(Y - X[:, feature])
            dist.append(feature_dist)
        
        return np.array(dist)
    
    def fit(self, X, Y):
        Y = Y.astype('float64')
        if Y.ndim == 1:
            Y = Y.reshape(-1, 1)
        
        n = X.shape[0]
        p = X.shape[1]
        q = Y.shape[1]
        
        n_components = self.n_components
        reg = self.reg
        eps = np.finfo(X.dtype).eps
        Y_eps = np.finfo(Y.dtype).eps
        
        self.x_weights_ = np.zeros((p, n_components))  # U
        self._x_scores = np.zeros((n, n_components))  # Xi
        self.x_loadings_ = np.zeros((p, n_components))  # Gamma
        self.y_loadings_ = np.zeros((q, n_components))  # Delta
        
        # Scale (in place)
        Xk, Yk, self._x_mean, self._y_mean, self._x_std, self._y_std = (
            _center_scale_xy(X, Y, self.scale))
        Yk_mask = np.all(np.abs(Yk) < 10 * Y_eps, axis=0)
        Yk[:, Yk_mask] = 0.0
        
        for k in range(n_components):
            # Compute the regularization matrix
            d = self._calc_dist(Xk, Yk)
            D = np.diag(d)
            print(reg * (D.T @ D))
            print((Xk.T @ Xk))
            
            # Compute the PLSRW weight
            w_inter = pinv2(
                ((Xk.T @ Xk) + (reg * (D.T @ D))), check_finite=False)
            x_weights = (w_inter @ Xk.T) @ Yk
            print("x_weights:", x_weights.shape)
            
            # Normalize weight
            x_weights /= np.sqrt(x_weights.T @ x_weights) + eps
            
            # Calculate the corresponding scores and loadings
            x_scores = Xk @ x_weights
            x_loadings = (Xk.T @ x_scores) / (x_scores.T @ x_scores)
            y_loadings = (Yk.T @ x_scores) / (x_scores.T @ x_scores)
            
            # Deflate X and Y
            Xk -= np.outer(x_scores, x_loadings)
            Yk -= np.outer(x_scores, y_loadings)
            print("Xk:", Xk.shape, "Yk:", Yk.shape)
            
            self.x_weights_[:, k] = x_weights[:, 0]
            self._x_scores[:, k] = x_scores[:, 0]
            self.x_loadings_[:, k] = x_loadings[:, 0]
            self.y_loadings_[:, k] = y_loadings[:, 0]

        # Compute transformation matrices
        self.x_rotations_ = np.dot(
            self.x_weights_,
            pinv2(np.dot(self.x_loadings_.T, self.x_weights_),
                  check_finite=False))
        
        self.coef_ = np.dot(self.x_rotations_, self.y_loadings_.T)
        self.coef_ = self.coef_ * self._y_std
        print("coef:", self.coef_.shape)
        return self
    
    def predict(self, X):
        return X @ self.coef_
    
    def score(self, X, y):
        y_pred = self.predict(X)
        return r2_score(y, y_pred)

In [None]:
plsrw = PLSRW(n_components=2, reg=0)
plsrw.fit(X, y)
print(plsrw.score(X, y))

## PLS Group-and-Average

In [None]:
def group_and_average(X, y, group_size=3):
    X_temp, y_temp = [], []
    
    # Multiply by 2 to enable the splits to be random
    num_sections = X.shape[0] / (group_size * 2)
    X_indices = np.arange(X.shape[0])
    subarray_indices = np.array(np.split(X_indices, num_sections, axis=0))
    
    # Shuffle each section so each time the split is run, the groups are different
    rng = np.random.default_rng()
    rng.shuffle(subarray_indices, axis=1)
    
    for subarray in subarray_indices:
        # Half each section to return to the original group size
        front_half, back_half = subarray[:group_size], subarray[group_size:]
        X_temp.append(np.mean(X[front_half], axis=0))
        X_temp.append(np.mean(X[back_half], axis=0))
        
        y_temp.append(np.mean(y[front_half]))
        y_temp.append(np.mean(y[back_half]))
    
    X_temp, y_temp = shuffle(X_temp, y_temp)
    return np.array(X_temp), np.array(y_temp)


def group_and_average_by_y(X, y, group_size=3):
    # Sort data by target
    sort_indices = np.argsort(y)
    X, y = X[sort_indices], y[sort_indices]
    
    return group_and_average(X, y, group_size)

In [None]:
num_component = 2
train_scores, test_scores = [], []
pearsonr_scores, spearmanr_scores = [], []
coefs = []

for j in range(0, 10):
    X_group, y_group = group_and_average_by_y(X, y)
    mi = SelectKBest(mutual_info_regression, k=3000)
    X_group = mi.fit_transform(X_group, y_group)
    
    for i in range(0, 10):
        X_train, X_test, y_train, y_test = train_test_split(
            X_group, y_group, test_size=0.3, shuffle=True)

        pls = PLSRegression(n_components=num_component)
        pls.fit(X_train, y_train)

        y_train_pred = pls.predict(X_train)
        y_test_pred = pls.predict(X_test)[:, 0]

        train_scores.append(r2_score(y_train, y_train_pred))
        test_scores.append(r2_score(y_test, y_test_pred))
        pearsonr_scores.append(stats.pearsonr(y_test, y_test_pred)[0])
        spearmanr_scores.append(stats.spearmanr(y_test, y_test_pred)[0])
        coefs.append(pls.coef_)

avg_train_score, avg_test_score = np.mean(train_scores), np.mean(test_scores)
avg_pearsonr, avg_spearmanr = np.mean(pearsonr_scores), np.mean(spearmanr_scores)
avg_coef = np.mean(coefs, axis=0)

print(f'Measure: {selected_measure}')
print(f"Avg train score: {avg_train_score:.3f}")
print(f"Avg test score: {avg_test_score:.3f}")
print(f"Avg pearson: {avg_pearsonr:.3f}")
print(f"Avg spearman: {avg_spearmanr:.3f}")

## PLS Iteration with Percentage Split

In [None]:
num_component = 2
train_scores, test_scores = [], []
pearsonr_scores, spearmanr_scores = [], []
coefs = []

for i in range(0, 1000):
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, shuffle=True)
    
    pipe.fit(X_train, y_train)
    y_train_pred = pipe.predict(X_train)
    y_test_pred = pipe.predict(X_test)[:, 0]
    
    train_scores.append(r2_score(y_train, y_train_pred))
    test_scores.append(r2_score(y_test, y_test_pred))
    pearsonr_scores.append(stats.pearsonr(y_test, y_test_pred)[0])
    spearmanr_scores.append(stats.spearmanr(y_test, y_test_pred)[0])
    coefs.append(pls.coef_)
    
    if (i == 0):
        print(X_train.shape, X_test.shape)

avg_train_score, avg_test_score = np.mean(train_scores), np.mean(test_scores)
avg_pearsonr, avg_spearmanr = np.mean(pearsonr_scores), np.mean(spearmanr_scores)
avg_coef = np.mean(coefs, axis=0)

print(f"Avg train score: {avg_train_score:.4f}")
print(f"Avg test score: {avg_test_score:.4f}")
print(f"Avg pearson: {avg_pearsonr:.4f}")
print(f"Avg spearman: {avg_spearmanr:.4f}")

## Expanded PLS Regression using K-fold Cross-validation

In [None]:
%%time
rkf = RepeatedKFold(n_splits=10, n_repeats=1, random_state=251183)
train_scores, test_scores = [], []
pearsonr_scores, spearmanr_scores = [], []
coefs = []

for train_index, test_index in rkf.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]
    
    pipe = make_pipeline(StandardScaler(), PLSRegression(n_components=2))
    pipe.fit(X_train, y_train)
    y_train_pred = pipe.predict(X_train)
    y_test_pred = pipe.predict(X_test)[:, 0]
    
    train_scores.append(r2_score(y_train, y_train_pred))
    test_scores.append(r2_score(y_test, y_test_pred))
    pearsonr_scores.append(stats.pearsonr(y_test, y_test_pred)[0])
    spearmanr_scores.append(stats.spearmanr(y_test, y_test_pred)[0])
    coefs.append(pipe['plsregression'].coef_)

avg_train_score, avg_test_score = np.mean(train_scores), np.mean(test_scores)
avg_pearsonr, avg_spearmanr = np.mean(pearsonr_scores), np.mean(spearmanr_scores)
avg_coef = np.mean(coefs, axis=0)

print(f'Measure: {selected_target}')
print(f"Avg train score: {avg_train_score:.2f}")
print(f"Avg test score: {avg_test_score:.2f}")
print(f"Avg pearson: {avg_pearsonr:.2f}")
print(f"Avg spearman: {avg_spearmanr:.2f}")

## IQ Binning

In [None]:
bins = bin_by_feature(X, y, y, 3)
bin_1, bin_2, bin_3 = bins[0], bins[1], bins[2]
print(f'Bin 1: {bin_1[0].shape} | Bin 2: {bin_2[0].shape} | Bin 3: {bin_3[0].shape}')

## Add Noise

In [None]:
def generate_noise_samples(X, y, num_times):
    X_std = np.std(X, axis=0)
    
    for i in range(0, num_times):
        X_noisy = X + np.random.normal(0, X_std, X.shape)
        X, y = np.append(X, X_noisy, axis=0), np.append(y, y)
    
    return shuffle(X, y)

## Display PLS plots during or after training

In [None]:
train_scores = []
test_scores = []
# fig, axs = plt.subplots(3, 3, figsize=(15, 15))
# x_fig, y_fig = 0, 0

for i in range(0, 1000):
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3)
    
    pls = PLSRegression(n_components=num_component)
    pls.fit(X_train, y_train)

    y_train_pred = pls.predict(X_train)
    y_test_pred = pls.predict(X_test)
    
    train_scores.append(r2_score(y_train, y_train_pred))
    test_scores.append(r2_score(y_test, y_test_pred))
    
#     if (i % 10 == 0):
#         axs[y_fig, x_fig].scatter(y_test, y_test_pred, alpha=0.3)
#         m, b, r, p, std_err = stats.linregress(y_test, y_test_pred[:,0])
#         axs[y_fig, x_fig].plot(y_test, (m * y_test) + b, alpha=0.3)
#         print(x_fig, y_fig, f"r:{r:.2f}", f"r^2:{test_scores[-1]:.2f}")
#         x_fig += 1
#         if (x_fig % 3 == 0):
#             x_fig = 0
#             y_fig += 1

avg_train_score = np.mean(train_scores)
avg_test_score = np.mean(test_scores)

print("Train r^2:", avg_train_score)
print("Test r^2:", avg_test_score)
# plt.show()

In [None]:
y_train = y_train.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

lin_reg_train = LinearRegression().fit(y_train, y_train_pred)
y_train_pred_lin_reg = lin_reg_train.predict(y_train)

lin_reg_test = LinearRegression().fit(y_test, y_test_pred)
y_test_pred_lin_reg = lin_reg_test.predict(y_test)

lin_reg_train_score = lin_reg_train.score(y_train, y_train_pred)
lin_reg_test_score = lin_reg_test.score(y_test, y_test_pred)
print("Train r^2:", lin_reg_train_score)
print("Test r^2:", lin_reg_test_score)

In [None]:
y_true = y_test.reshape(-1, 1)
y_pred = y_test_pred.reshape(-1, 1)

lr = LinearRegression().fit(y_true, y_pred)
lr_pred = lr.predict(y_true)
print(r2_score(y_true, y_pred), r2_score(y_true, lr_pred))

plt.scatter(y_true, y_pred, alpha=0.3)
plt.plot(y_true, lr_pred)
plt.title("Training Set")
plt.xlabel(f'True {selected_target}')
plt.ylabel(f'Predicted {selected_target}')
plt.show()

In [None]:
plt.scatter(y_train, y_train_pred, alpha=0.3, color='black')
plt.plot(y_train, y_train_pred_lin_reg, color='#897B61')
plt.title("Training Set")
plt.xlabel(f'True {selected_measure}')
plt.ylabel(f'Predicted {selected_measure}')
# plt.annotate(f"r-squared = {avg_train_score:.3f}", (6, 16))
plt.show()

In [None]:
plt.scatter(y_test, y_test_pred, alpha=0.3, color='black')
plt.plot(y_test, y_test_pred_lin_reg, color='#897B61')
plt.title("Testing Set")
plt.xlabel(f'True {selected_measure}')
plt.ylabel(f'Predicted {selected_measure}')
# plt.annotate(f"r-squared = {lin_reg_test_score:.2f}", (60, 87))
plt.show()

In [None]:
plt.figure(1, figsize=(10, 10))
plt.hist(x=test_scores, rwidth=0.95)
plt.title("Test Score Distribution")
plt.xlabel('Score')
plt.ylabel('Number of Scores')
plt.show()

## Display Yeo FC

In [None]:
# To display each class of connections (within and between)
plt.figure(figsize=(10, 10))
plt.imshow(subject_fc)
a = np.zeros((11, 11))
a[fpn_indices] = subject_fc[fpn_indices]
plt.imshow(a)
b = np.zeros((8, 8))
b[np.triu_indices(8, k=1)] = subject_fc[dmn_indices]
plt.imshow(b)
plt.imshow(subject_fc[:11, 11:])

## Plot Brain Images

In [None]:
# For plotting other functional images
img = image.load_img(f'{subject_path}/wr{subject}_task-movieDM_bold_0375.nii')
concat_img = image.concat_imgs(subject_niftis)
img = image.index_img(concat_img, 0)
img = image.mean_img(concat_img)

## Feature Selection (Poor Performance)

### Variance Threshold

In [None]:
sel = VarianceThreshold(0.055)
X = sel.fit_transform(X)
print("X shape:", X.shape)

### Select k strongest connections

In [None]:
def score_fc(X, y):
    # Take the strongest correlations regardless of sign
    sum_fc = np.absolute(np.sum(X, axis=0))
    return sum_fc

X = SelectKBest(score_fc, k=3400).fit_transform(X, y)
print("X shape:", X.shape)

### Display MI Before and After Transformation

In [None]:
mi = SelectKBest(mutual_info_regression, k=1000)
X_mi = mi.fit_transform(X, y)
print("X_mi shape:", X_mi.shape)

mi_bin_1 = SelectKBest(mutual_info_regression, k=3000)
X_bin_1 = mi_bin_1.fit_transform(bin_1[0], bin_1[1])

mi_bin_2 = SelectKBest(mutual_info_regression, k=2000)
X_bin_2 = mi_bin_2.fit_transform(bin_2[0], bin_2[1])

mi_bin_3 = SelectKBest(mutual_info_regression, k=500)
X_bin_3 = mi_bin_3.fit_transform(bin_3[0], bin_3[1])

print(f'X_bin_1: {X_bin_1.shape} | X_bin_2: {X_bin_2.shape} | X_bin_3: {X_bin_3.shape}')

In [None]:
from common.plotting import create_power_fc_matrix, plot_fc_matrix, plot_fc_graph

plot_fc_matrix(create_power_fc_matrix(X[1]), -1, 1)
plot_fc_matrix(create_power_fc_matrix(mi.inverse_transform(X_1[1].reshape(1, -1))), -1, 1)

## Get Diagnosis Data

In [None]:
demographic_measures = ['Age', 'Sex', 'Diagnosis']

if measure == 'Diagnosis':
    demographics[measure].append(
        wisc_labels.at[subject_id, f'assessment Diagnosis_ClinicianConsensus,NoDX'])