## Sandbox notebook


In [None]:
import os
import time 
import shutil 
import numpy as np
import pandas as pd
pd.set_option("display.precision", 20)

from lib.script_01_00 import generate_initial_variables
from lib.script_01_01 import generate_raw_confounds
from lib.script_01_02 import generate_nonlin_confounds

from src.nets.nets_load_match import nets_load_match
from src.nets.nets_inverse_normal import nets_inverse_normal 
from src.nets.nets_normalise import nets_normalise 
from src.nets.nets_demean import nets_demean
from src.nets.nets_deconfound import nets_deconfound

from src.duplicate.duplicate_categorical import duplicate_categorical
from src.duplicate.duplicate_demedian_norm_by_site import duplicate_demedian_norm_by_site

from src.preproc.datenum import datenum
from src.preproc.days_in_year import days_in_year

from src.memmap.MemoryMappedDF import MemoryMappedDF
from src.memmap.read_memmap_df import read_memmap_df
from src.memmap.addBlockToMmap import addBlockToMmap

In [None]:
data_dir = '/well/win/projects/ukbiobank/fbp/confounds/data/72k_data/'

# Output directory (will eventually be equal to data_dir)
out_dir = '/well/nichols/users/inf852/confounds/data/'

In [None]:

# Read in precomputed memmaps
IDPs = read_memmap_df(os.path.join(os.getcwd(),'saved_memmaps','IDPs.npz'))
nonIDPs = read_memmap_df(os.path.join(os.getcwd(),'saved_memmaps','nonIDPs.npz'))
misc = read_memmap_df(os.path.join(os.getcwd(),'saved_memmaps','misc.npz'))
confounds = read_memmap_df(os.path.join(os.getcwd(),'saved_memmaps','confounds.npz'))

In [None]:
import os
import numpy as np
import pandas as pd
from src.nets.nets_svd import nets_svd
from src.nets.nets_demean import nets_demean
from src.memmap.MemoryMappedDF import MemoryMappedDF
from src.nantools.all_non_nan_inds import all_non_nan_inds
from src.nantools.create_nan_patterns import create_nan_patterns


In [None]:
from src.nets.nets_deconfound_once import inside_loop

In [None]:
cluster_cfg = {'cluster_type':'local','num_nodes':12}

In [None]:
i = 3
y = IDPs[:,:]
conf = confounds[:,:]
mode = 'nets_svd'
demean = True
dtype = 'float64'
conf_has_nans = False
check_nan_patterns=False

In [None]:

# Load the Local Cluster
from dask.distributed import LocalCluster
from dask.distributed import Client, as_completed
cluster = LocalCluster()


# --------------------------------------------------------------------------------
# Connect to client
# --------------------------------------------------------------------------------

# Connect to cluster
client = Client(cluster)   

# --------------------------------------------------------------------------------
# Scale
# --------------------------------------------------------------------------------
cluster.scale(cluster_cfg['num_nodes'])


In [None]:
cluster

In [None]:


# # Save original index
# original_index = y.index

# # Check if confounds have NaNs
# if conf_has_nans is None:
    
#     # If the type is memory mapped
#     if type(conf)==MemoryMappedDF:

#         # Work out if the confounds have nans
#         conf_has_nans = conf[:,:].isna().sum().sum()

#     else:
#         # Work out if the confounds have nans
#         conf_has_nans = conf.isna().sum().sum()

# # If the confounds have nans
# if conf_has_nans:
    
#     # If the type is memory mapped
#     if type(conf)==MemoryMappedDF:
        
#         # We are trying to avoid reading everything in at once
#         conf_non_nan_inds = all_non_nan_inds(conf, safeMode=True)
        
#     else:
        
#         # Otherwise, we can get the indices for non-nan rows in conf directly
#         conf_non_nan_inds = all_non_nan_inds(conf)

#     # Reduce conf and y down, ignoring the nan rows for conf
#     conf = conf[conf_non_nan_inds]
#     y = y[conf_non_nan_inds]
    
#     # If we have subset the data we need to demean again
#     if demean:
        
#         # Demean y and conf
#         y = nets_demean(y)
#         conf = nets_demean(conf)


# # If we are checking unique nan patterns record the number of them
# if check_nan_patterns:
        
#     # We now need to get the nan-patterns for y (we don't include
#     # columns with 5 or less values).
#     nan_patterns_y = create_nan_patterns(y, thresh=5)
    
#     # Number of columns which meet our nan-thresholding requirements
#     n_cols = len([j for i in nan_patterns_y for j in nan_patterns_y[i]['columns']])

# # Else, we just set n_cols to the number of columns in y for now and fix at the end
# else:

#     # Set number of columns
#     n_cols = y.shape[1]

# # Initialize empty nan dataframe
# y_deconf = pd.DataFrame(np.zeros((y.shape[0],n_cols),dtype=dtype),index=y.index)

# # Set column headers
# if check_nan_patterns:
    
#     # We're only including column names for the variables that were not removed during nan pattern
#     # identification.
#     y_deconf.columns = [j for i in nan_patterns_y for j in nan_patterns_y[i]['columns']]

# else:
    
#     # Copy from y
#     y_deconf.columns = y.columns
    
# # Change types to memory mapped dfs
# MemoryMappedDF(y).save(os.path.join(os.getcwd(),'temp_mmap','y.npz'))
# MemoryMappedDF(conf).save(os.path.join(os.getcwd(),'temp_mmap','conf.npz'))

# # Scatter the data across the workers
# scattered_y = client.scatter(os.path.join(os.getcwd(),'temp_mmap','y.npz'))
# scattered_conf = client.scatter(os.path.join(os.getcwd(),'temp_mmap','conf.npz'))
# mode = client.scatter(mode)

# t1 = time.time()

# # Empty futures list
# futures = []

# # If we are checking unique nan patterns record the number of them
# if check_nan_patterns:

#     # Number of patterns
#     num_patterns = len(nan_patterns_y)

# # Otherwise we need to loop through our variables one by one
# else:

#     # Treating each variable as though it has its own unique pattern
#     num_patterns = y.shape[1]
    
# Loop through all unique nan patterns in y
for i in range(num_patterns):

    print('Deconfounding: ', i+1, '/', num_patterns)

    # If we have a pattern, use it
    if check_nan_patterns:
        
        # Get the pattern and columns
        non_nan = ~np.array(nan_patterns_y[i]['pattern'],dtype=bool)
        columns = nan_patterns_y[i]['columns']

    # Otherwise set to none
    else:

        # Empty pattern and current column
        non_nan = None
        columns = [y.columns[i]]
    
    # Submit a job to the local cluster
    future_i = client.submit(inside_loop, scattered_y, scattered_conf, 
                             columns, mode, non_nan, pure=False)

    # Append to list 
    futures.append(future_i)


# Completed jobs
completed = as_completed(futures)

# Wait for results
for i in completed:
    i.result()

t2 = time.time()
print('dask time: ', t2-t1)

# Delete the future objects (NOTE: see above comment in setup section).
del i, completed, futures, future_i


# # Get the list of columns in y that are also in y_deconf
# common_columns = [col for col in y.columns if col in y_deconf.columns]

# # Reorder y_deconf columns to match the order of common columns in y
# y_deconf = y_deconf[common_columns]
    
# # Initialise output dataframe
# deconf_out = pd.DataFrame(index=original_index,columns=y_deconf.columns,dtype=dtype)

# # Restore the nan rows
# if conf_has_nans:
#     deconf_out[conf_non_nan_inds] = np.array(y_deconf.values,dtype=dtype)
# else:
#     deconf_out[:] = np.array(y_deconf.values,dtype=dtype)


In [None]:
future


In [None]:
columns, mode, non_nan

In [None]:
col

In [None]:
np.isnan(x[:, 0]).any()

In [None]:
def all_non_nan_inds(x, safeMode=False):

    # If we aren't in safe mode just read everything in.
    if not safeMode:
        
        # If the type is memory mapped
        if type(x)==MemoryMappedDF:
    
            # Get the values
            x = x[:,:].values

        return(~np.isnan(x).any(axis=1))

    # Assume we can't load all data in at once
    else:

        # Create an empty boolean array
        nan_array = np.zeros(x.shape[1], dtype=bool)

        # Loop through columns one by one
        for col in range(x.shape[1]):
            nan_array[col] = np.isnan(x[:, col].values).any()

        # Return result
        return(nan_array)

In [None]:
import numpy as np
import time
from src.nets.nets_svd import nets_svd

tmp = np.random.randn(60000,400)


t1 = time.time()
u,d,v = np.linalg.svd(tmp, full_matrices=False)
t2 = time.time()
print(t2-t1)

t1 = time.time()
u2,d2,v2 = nets_svd(tmp, reorder=False)
t2 = time.time()
print(t2-t1)

In [None]:
np.amax(np.abs(tmp - (u @ np.diag(d) @ v))), np.amax(np.abs(tmp - (u2 @ np.diag(d2) @ v2)))

In [None]:
u2.shape

In [None]:
from dask.distributed import LocalCluster
cluster = LocalCluster()

In [None]:
cluster


In [None]:
import dask
dask.config.config

In [None]:
print(dask.config.paths)


In [None]:
from dask.distributed import LocalCluster
from dask.distributed import Client, as_completed
cluster = LocalCluster()

# Connect to cluster
client = Client(cluster)   

# Read in number of nodes we need
num_nodes = 12

# Scale the cluster
cluster.scale(num_nodes)
    

In [None]:
MARKER DASK

In [None]:

y = IDPs[:,:]
conf = confounds[:,:]
mode='nets_svd'
demean=True, 
dtype='float64'
conf_has_nans=None

t1 = time.time()
# Save original index
original_index = y.index

# Check if confounds have NaNs
if conf_has_nans is None:
    
    # If the type is memory mapped
    if type(conf)==MemoryMappedDF:

        # Work out if the confounds have nans
        conf_has_nans = conf[:,:].isna().sum().sum()

    else:
        # Work out if the confounds have nans
        conf_has_nans = conf.isna().sum().sum()

# If the confounds have nans
if conf_has_nans:
    
    # If the type is memory mapped
    if type(conf)==MemoryMappedDF:
        
        # We are trying to avoid reading everything in at once
        conf_non_nan_inds = all_non_nan_inds(conf, safeMode=True)
        
    else:
        
        # Otherwise, we can get the indices for non-nan rows in conf directly
        conf_non_nan_inds = all_non_nan_inds(conf)

    # Reduce conf and y down, ignoring the nan rows for conf
    conf = conf[conf_non_nan_inds]
    y = y[conf_non_nan_inds]
    
    # If we have subset the data we need to demean again
    if demean:
        
        # Demean y and conf
        y = nets_demean(y)
        conf = nets_demean(conf)
    
# We now need to get the nan-patterns for y
nan_patterns_y = create_nan_patterns(y)

t2 = time.time()

print('init time: ', t2-t1)
    
# Empty futures list
futures = []

# Submit jobs
for i in nan_patterns_y:

    print('Deconfounding: ', i+1, '/', len(nan_patterns_y))

    # Get the pattern
    non_nan = ~np.array(nan_patterns_y[i]['pattern'],dtype=bool)
    
    # Check if we have at least 5 non-nan values
    if np.sum(1*non_nan) > 5:
        
        # Run the i^{th} job.
        future_i = client.submit(inside_loop, 
                                 y[nan_patterns_y[i]['columns']], 
                                 conf, non_nan, mode, pure=False)
    
        # Append to list 
        futures.append(future_i)

# Completed jobs
completed = as_completed(futures)

    
# # Get the list of columns in y that are also in y_deconf
# common_columns = [col for col in y.columns if col in y_deconf.columns]

# # Reorder y_deconf columns to match the order of common columns in y
# y_deconf = y_deconf[common_columns]
    
# # Remove columns where all values are NaN
# y_deconf = y_deconf.dropna(axis=1, how='all')

# # Initialise output dataframe
# deconf_out = pd.DataFrame(index=original_index,columns=y_deconf.columns,dtype=dtype)

# # Restore the nan rows
# if conf_has_nans:
#     deconf_out[conf_non_nan_inds] = np.array(y_deconf.values,dtype=dtype)
# else:
#     deconf_out[:] = np.array(y_deconf.values,dtype=dtype)


In [None]:
from src.preproc.switch_type import switch_type

#nonIDPs.save(os.path.join(os.getcwd(),'tmp_file2'))
x=os.path.join(os.getcwd(),'tmp_file2')


In [None]:
x=switch_type(x,'pandas')

In [None]:
x

In [None]:

from src.memmap.read_memmap_df import read_memmap_df
y = read_memmap_df(x)

In [None]:
y[:,:]

In [None]:
b'\x94\x8c\x0f_unpickle_block\x94\x93\x94\x8c\x12numpy.core.numeric\x94\x8c\x0b_frombuffer\x94\x93\x94(\x97h\xbd\x8c\x05dtype\x94\x93\x94\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bM\x8f\x0fJ\x8e\x07\x01\x00\x86\x94\x8c\x01F\x94t\x94R\x94\x8c\x08builtins\x94\x8c\x05slice\x94\x93\x94K\x00M\x8f\x0fK\x01\x87\x94R\x94K\x02\x87\x94R\x94\x85\x94]\x94(\x8c\x18'.decode()

In [None]:

with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here1', file=f)

In [None]:
future.visualise()

In [None]:

# Scatter the data across the workers
tmp_y = os.path.join(os.getcwd(),'temp_mmap','y.npz')
tmp_conf = os.path.join(os.getcwd(),'temp_mmap','conf.npz')
mode = 'nets_svd'
columns = [y.columns[10]]
non_nan = None

In [None]:
inside_loop(tmp_y, tmp_conf, columns, mode, non_nan)

In [None]:
# Change types to memory mapped dfs
y2 = switch_type(tmp_y,out_type="MemoryMappedDF")
conf2 = switch_type(tmp_conf,out_type="MemoryMappedDF")

# Get the y's we're interested in
y_current = y2[:,columns]

# If we don't have nans work them out
if non_nan is None:
    non_nan = ~np.array(y_current.isna().astype(int).values,dtype=bool)

In [None]:
conf2[:,:][non_nan]

In [None]:

y_current = y_current[non_nan]
conf_current = conf2[np.where(non_nan),:]

In [None]:
del y2, conf2

In [None]:
# type(y_current.isna().astype(int).tolist())
from src.preproc.switch_type import switch_type

In [None]:
with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here1', file=f)
    
# Change types to memory mapped dfs
y2 = switch_type(tmp_y,out_type="MemoryMappedDF")
conf2 = switch_type(tmp_conf,out_type="MemoryMappedDF")

# Get dimensions we are ouputting to
out_dim = y2.shape

with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here2', file=f)
    
# Save original index and columns for outputting later
y_index_original = y2.index
y_columns_original = y2.columns

# Get the y's we're interested in
y_current = y2[:,columns]

with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here3', file=f)
# MARKER NEED TO CHECK HAVE AT LEAST 5 NON NAN HERE

# If we don't have nans work them out
if non_nan is None:
    non_nan = ~np.array(y_current.isna().astype(int).values,dtype=bool)

with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here4', file=f)
    
# Subset y and conf to the appropriate rows
y_current = y_current[non_nan]
conf_current = conf2[:,:][non_nan] # Only time all data is read in

with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here5', file=f)
    
# Save y index and columns
y_index = y_current.index
y_columns = y_current.columns

with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here6', file=f)

# If we are demeaning
if demean:
    
    # Demean conf_current
    conf_current = nets_demean(conf_current)
    
with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here7', file=f)
    
# We don't want to work on views of the data as it will slow the computation
conf_current = np.array(conf_current.values)
y_current = np.array(y_current.values)

with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here8', file=f)
    
# Check if we are using psuedo inverse
if mode.lower() == 'pinv':

    # Regress conf out of y_current - we perform the pseudo inverse on
    # conf^T @ conf as we expect the number of columns to be much(!) less
    # than the number of rows and thus this ends up being more numerically
    # stable than trying to invert, or approximately invert, conf itself.
    betahat = np.linalg.pinv(conf_current.T @ conf_current) @ conf_current.T @ y_current

    # Set computational zeros to actual zeros
    betahat[np.abs(betahat) < 1e-10] = 0

    # Get deconfounding variable predicted values to regress out
    deconf_pred = pd.DataFrame(conf_current @ betahat)
    deconf_pred.index = y_index
    deconf_pred.columns = y_columns

# Otherwise use svd
elif mode.lower() == 'nets_svd':
    
    # Multiply the left-singular values which contribute to the rank of conf
    # by the corresponding singular values to rank reduce conf
    U, S, _ = nets_svd(conf_current, reorder=False)
    
    # Rank reduce U and reduce datatype as only need to multiply
    # U = U[:, S < 1e-10]
    
    # Get deconfounding variable predicted values to regress out
    deconf_pred = pd.DataFrame(U @ (U.T @ y_current))
    deconf_pred.index = y_index
    deconf_pred.columns = y_columns

# Otherwise use svd
elif mode.lower() == 'svd':
    
    # Multiply the left-singular values which contribute to the rank of conf
    # by the corresponding singular values to rank reduce conf
    U, S, _ = np.linalg.svd(conf_current, full_matrices=False)
    
    # Get the rank of the matrix
    rank = np.sum(S > 1e-10)
    
    # Rank reduce U and reduce datatype as only need to multiply
    U = U[:, :rank]
    
    # Get deconfounding variable predicted values to regress out
    deconf_pred = pd.DataFrame(U @ (U.T @ y_current))
    deconf_pred.index = y_index
    deconf_pred.columns = y_columns
    
else:

    # Perform qr decomposition
    Q, R = np.linalg.qr(conf_current)
    betahat = np.linalg.pinv(R) @ (Q.T @ y_current)

    # Set computational zeros to actual zeros
    betahat[np.abs(betahat) < 1e-10] = 0

    # Get deconfounding variable predicted values to regress out
    deconf_pred = pd.DataFrame(conf_current @ betahat)
    deconf_pred.index = y_index
    deconf_pred.columns = y_columns
    
with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here9', file=f)
    
# Get deconfounded y
y_deconf_current = pd.DataFrame(y_current, index=y_index, columns=y_columns) - deconf_pred

    
with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here10', file=f)
    
# If we are demeaning, demean y
if demean:
    y_deconf_current = nets_demean(y_deconf_current)

    
with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here11', file=f)
    
# Update deconfounded y 
y_deconf_current_with_nans = np.ones((len(y_index_original), 
                                      len(y_deconf_current.columns)))*np.NaN

with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
    print('here12', file=f)
    
# Update with current values
y_deconf_current_with_nans[non_nan.flatten(),:] = y_deconf_current.values[:,:]

# Make into a dataframe with correct index and rows
y_deconf_current_with_nans = pd.DataFrame(y_deconf_current_with_nans,
                                         index=y_index_original,
                                         columns=y_deconf_current.columns)

# Indices for where to add to memmap
indices = np.ix_(np.arange(out_dim[0]),
                 [list(y_columns_original).index(column) for column in columns])

# Output filename
out_fname = os.path.join(os.getcwd(),'temp_mmap','y_deconf.dat')
addBlockToMmap(out_fname, y_deconf_current_with_nans.values, indices, out_dim, dtype=dtype)

# with open('/well/nichols/users/inf852/confounds/tmp.txt','a') as f:
#     print('here13', file=f)
    
# t2 = time.time()

# print('iteration time: ', t2-t1)





In [None]:
tmp44=np.memmap(out_fname,shape=out_dim,dtype=dtype)

In [None]:

# Indices for where to add to memmap
indices2 = np.ix_([list(y_columns_original).index(column) for column in columns],np.arange(out_dim[0]))


t1 = time.time()
addBlockToMmap(out_fname, y_deconf_current_with_nans.values, indices, out_dim, dtype=dtype)
t2 = time.time()
print(t2-t1)

t1 = time.time()
addBlockToMmap(out_fname + str(2), y_deconf_current_with_nans.values, indices2, (out_dim[1],out_dim[0]), dtype=dtype)
t2 = time.time()
print(t2-t1)

In [None]:
tmp44=np.memmap(out_fname,shape=out_dim,dtype=dtype)
tmp45=np.memmap(out_fname + str(2),shape=(out_dim[1],out_dim[0]),dtype=dtype) 

In [None]:
tmp44.shape,tmp45.shape

In [None]:
np.all(tmp44[:,10]==tmp45[10,:])

In [None]:
np.sum(tmp45[10,:]==tmp44[:,10])

In [None]:
np.sum(np.isnan(tmp44[:,10]))

In [None]:
199+67271

In [None]:
tmp1 = np.memmap(os.path.join(os.getcwd(),'temp_mmap','y_deconf.dat'),shape=(y2.shape[1],y2.shape[0]),dtype=dtype)
tmp1 =