In [1]:
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import pickle
import h5py
import seaborn as sns
import pandas as pd
from scipy.stats import pearsonr
root = os.path.dirname(os.path.abspath(os.curdir))
sys.path.append(root)

import torch
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader
from torch.utils.data import Subset

from predify.utils.training import train_pcoders, eval_pcoders

from networks_2022 import BranchedNetwork
from data.CleanSoundsDataset import CleanSoundsDataset
from data.NoisyDataset import NoisyDataset

# Parameters

In [2]:
engram_dir = '/mnt/smb/locker/abbott-locker/hcnn/'
checkpoints_dir = f'{engram_dir}checkpoints/'
tensorboard_dir = f'{engram_dir}tensorboard/'
activations_dir = f'{engram_dir}activations_pnet/'
pickles_dir = f'{engram_dir}pickles/'

In [3]:
DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print(f'Device: {DEVICE}')

Device: cpu


# Distance functions

In [10]:
# A few distance metrics

def row_rms(A, B):
    """
    RMS across rows
    """
    
    if torch.is_tensor(A):
        A = A.numpy()
    if torch.is_tensor(B):
        B = B.numpy()
    A = A.astype(float)
    B = B.astype(float)
    
    if len(A.shape) == 1:
        stim = A - B
        return np.sqrt(np.mean(stim * stim, axis = 0))
    
    rmses = []
    for idx in range(A.shape[0]):
        a = A[idx]
        b = B[idx]
        a, b = a.T, b.T
        stim = (a - b)
        out = np.sqrt(np.mean(stim * stim, axis = 0))
        rmses.append(out)
    return np.mean(rmses)

def rms(A, B):
    """
    RMS of flattened vectors
    """
    
    if torch.is_tensor(A):
        A = A.numpy()
    if torch.is_tensor(B):
        B = B.numpy()
    A = A.astype(float)
    B = B.astype(float)
    A = A.flatten()
    B = B.flatten()
        
    stim = A - B
    out = np.sqrt(np.mean(stim * stim))

    return out

def tanimoto_distance(A, B):
    """
    Tanimoto distance of flattened vector
    """
    
    if torch.is_tensor(A):
        A = A.numpy()
    if torch.is_tensor(B):
        B = B.numpy()
    A = A.astype(float)
    B = B.astype(float)
    A = A.flatten()
    B = B.flatten()
    
    _out = np.dot(A, B)/(np.linalg.norm(A)**2 + np.linalg.norm(B)**2 - np.dot(A,B))
    return _out
    
def cosine_similarity(A, B):
    """
    Cosine similarity of flattened vector
    """
    
    if torch.is_tensor(A):
        A = A.numpy()
    if torch.is_tensor(B):
        B = B.numpy()
    A = A.astype(float)
    B = B.astype(float)
    A = A.flatten()
    B = B.flatten()
    
    if len(A.shape) == 1:
        return np.dot(A, B)/(np.linalg.norm(A)*np.linalg.norm(B))
    
    out = []
    for channel in range(n_channels):
        a = A[channel]
        b = B[channel]
        _out = np.dot(a, b)/(np.linalg.norm(a)+np.linalg.norm(b)-np.dot(a,b))
        if np.isnan(_out):
            print(f'nan: {np.linalg.norm(a)}, {np.linalg.norm(b)}')
        out.append(_out)

    return np.mean(out)

from scipy.stats import pearsonr
def pearsonr_sim(A, B):
    if torch.is_tensor(A):
        A = A.numpy()
    if torch.is_tensor(B):
        B = B.numpy()
    A = A.astype(float)
    B = B.astype(float)
    A = A.flatten()
    B = B.flatten()
    pear, _ = pearsonr(A, B)
    return pear

# Function to collect correlations

In [5]:
def eval_correlations(results, dist_func):
    
    engram_dir = '/mnt/smb/locker/abbott-locker/hcnn/'
    checkpoints_dir = f'{engram_dir}checkpoints/'
    tensorboard_dir = f'{engram_dir}tensorboard/'
    activations_dir = f'{engram_dir}activations_pnet/'
    pickles_dir = f'{engram_dir}pickles/'
    DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu'
    
    labels = np.array(results['label'])
    idxs = np.arange(labels.size)
    
    corr_shuffle = []
    timestep = []
    layer = []
    layers = ['conv1', 'conv2', 'conv3', 'conv4_W', 'fc6_W']
    
    n_timesteps = 5
    for i in idxs:
        mean_sub = {}
        for t in range(n_timesteps):
            for l in layers:
                noisy_activ = results[f'{l}_{t}_activations'][i]
                clean_activ = results[f'{l}_{t}_clean_activations'][np.random.choice(idxs)]
                dist = dist_func(noisy_activ, clean_activ)
                corr_shuffle.append(dist)
                timestep.append(t)
                layer.append(l)
                
    return corr_shuffle, timestep, layer

# Background noise, incorrect dataset

In [6]:
# This is bad practice! But the warnings are real annoying
import warnings
warnings.filterwarnings("ignore")

In [11]:
file_prefix = 'shuffle_pearsonr'
dist_func = pearsonr_sim

In [12]:
bgs = ['AudScene', 'Babble8Spkr']
snrs = [3.0]

In [None]:
for bg in bgs:
    for snr in snrs:
        results_path = f'{activations_dir}{bg}_snr{int(snr)}.hdf5'
        results = h5py.File(results_path, 'r')
        corr_shuffle, timestep, layer = eval_correlations(
            results, dist_func
            )
        with open(f'{pickles_dir}{file_prefix}_{bg}_snr{snr}.p', 'wb') as f:
            pickle.dump({
                'Corr Shuffle': corr_shuffle,
                'Timestep': timestep,
                'Layer': layer
                }, f)