In 2/8/2020, there was a loss of the stor01 storage, including the external storage for the stack.PreprocessedStack table.  As a result, there are still entries for the affected items in the table, but fetches for the stacks for those entries would fail.  When the database was migrated from dj 0.11 to dj 0.12.9, Anthony Ramos automatically detected the missing external files, and replaced them with a short text message:

'External storage was lost on 2/8/2020, as a result the external for this stack is unavailable. \n                    The message you are seeing now is a result of a migration to dj 0.12.9 on 8/25/2021. If you would like this stack \n                    repopulated, please contact the pipeline engineer '

In order to correct this loss, it is not feasible to simply repopulate the entries in stack.PreprocessedStack, because there are too many downstream dependencies that would also be deleted and repopulated as a result, including published results that we do not want to lose the original records.  

Instead, we directly modify the files that are stored at the location indicated in the external path.

THIS SHOULD NOT BE A ROUTINE OPERATION.  THIS IS AN UNUSUAL CASE.

However, this allows us to replace those files, using the same code that was used to routinely populate them the first time, since we have prior knowledge that the code has not changed since that time.  This allows the population of additional tables that are dependent on those stacks, or using those stacks for other purposes (figures, export, etc).  

The below function was written by Anthony Ramos and modified by Paul Fahey for overwriting the error message text files with the correct stack files.  It operates by tracking the storage location from the external file using pymysql, recreates the stacks using prepackaged functions from the stack pipeline, and writes them to those file locations.  

Notably, dj 0.12.9 has a preliminary caching function that was elaborated in future versions (dj 0.13).  If you import stack, it sets dj.config['cache'] to /tmp/dj-cache.  The stack code will also preferentially fetch from the local cache if there is a dj.config['cache'] key.  This is usually preferable, since it will speed up the fetch, and it is very rare and violates fundamental data storage principles to modify the entries in the table without deleting and repopulating.  As mentioned above, this is a niche case that should not be generalized.  However, in this case, if you fetch the error message, that fetch will go into the cache.  Therefore, even if you modify the stack at the external location, fetchs to that entry will still return the error message from the cache instead of the stack, even though at the remote location that file has been successfully overwritten.  This can be avoided by simply deleting the dj.config['cache'] item, as seen in the import block below.  

All of the preprocessed stack_candidate_keys below have been replaced using this notebook, as of 2/18/2022.

In [1]:
import os
import scipy
import pymysql
import numpy as np

import matplotlib.pyplot as plt

import datajoint as dj
from pathlib import Path
from datajoint.utils import safe_write
from datajoint.blob import pack, unpack
from pipeline.utils import registration, enhancement
from pipeline.stack import CorrectedStack, PreprocessedStack

# necessary to prevent datajoint from automatically caching the external fetchs, 
# preventing the updated files from showing.  cache automatically set during stack import, maybe other places
del dj.config['cache']

Loading local settings from pipeline_config.json
Connecting pfahey@at-database.ad.bcm.edu:3306


In [3]:
dics = np.load('/mnt/scratch07/zhiwei/static_scan_release_keys_filtered.npy', allow_pickle=True)
stack_candidate_keys = [{k:v for k,v in zip(('animal_id','session','stack_idx'),
                                            d['stack'].split('-'))} for d in dics]
stack_candidate_keys

[{'animal_id': '21067', 'session': '9', 'stack_idx': '1'},
 {'animal_id': '21067', 'session': '10', 'stack_idx': '25'},
 {'animal_id': '21067', 'session': '13', 'stack_idx': '2'},
 {'animal_id': '22620', 'session': '4', 'stack_idx': '16'},
 {'animal_id': '22620', 'session': '4', 'stack_idx': '16'},
 {'animal_id': '22620', 'session': '5', 'stack_idx': '14'},
 {'animal_id': '22846', 'session': '2', 'stack_idx': '20'},
 {'animal_id': '22846', 'session': '2', 'stack_idx': '20'},
 {'animal_id': '22846', 'session': '7', 'stack_idx': '14'},
 {'animal_id': '22846', 'session': '10', 'stack_idx': '17'},
 {'animal_id': '23343', 'session': '5', 'stack_idx': '22'},
 {'animal_id': '23555', 'session': '5', 'stack_idx': '13'},
 {'animal_id': '23656', 'session': '14', 'stack_idx': '18'},
 {'animal_id': '23964', 'session': '4', 'stack_idx': '23'},
 {'animal_id': '26644', 'session': '14', 'stack_idx': '18'},
 {'animal_id': '26645', 'session': '2', 'stack_idx': '19'}]

In [6]:
key = (PreprocessedStack & stack_candidate_keys[1]).fetch1('KEY')
display(key)

{'animal_id': 21067,
 'session': 10,
 'stack_idx': 25,
 'volume_id': 1,
 'channel': 1}

In [7]:
# if the stack has not been replaced yet, will return a text file describing the cause of the loss
# if replaced, will return stack matrices
r,l,s = (PreprocessedStack & key).fetch1('resized','lcned','sharpened')
r,l,s

(array(['External storage was lost on 2/8/2020, as a result the external for this stack is unavailable. \n                    The message you are seeing now is a result of a migration to dj 0.12.9 on 8/25/2021. If you would like this stack \n                    repopulated, please contact the pipeline engineer '],
       dtype=object),
 array(['External storage was lost on 2/8/2020, as a result the external for this stack is unavailable. \n                    The message you are seeing now is a result of a migration to dj 0.12.9 on 8/25/2021. If you would like this stack \n                    repopulated, please contact the pipeline engineer '],
       dtype=object),
 array(['External storage was lost on 2/8/2020, as a result the external for this stack is unavailable. \n                    The message you are seeing now is a result of a migration to dj 0.12.9 on 8/25/2021. If you would like this stack \n                    repopulated, please contact the pipeline engineer '],
       d

In [None]:
# if the stack has not been replaced yet, will fail
# if replaced, will plot a slice from the preprocessed stacks
fig,axes = plt.subplots(1,3,figsize=(10,3))
for ax,s in zip(axes,(r,l,s)):
    ax.imshow(s[100,:,:])
    

In [None]:
# if the stacks have not been replaced yet, this function will recompute the 3 preprocessed stacks
# from corrected stack, look up the 3 external locations where the stack is supposed to be, and 
# overwrite the 3 files at those locations with the recomputed stacks


# replace_stack(key)

In [10]:
def replace_stack(key):
    connection = pymysql.connect(host='at-database.ad.bcm.edu',
                             user='root',
                             password='primate123',
                             database='pipeline_stack',
                             cursorclass=pymysql.cursors.DictCursor)
    with connection.cursor() as cursor:
        ## Gets all the hashes/filepaths for the given stack key
        cursor.execute(f"""SELECT * FROM pipeline_stack.`~external_stack` e 
            JOIN pipeline_stack.__preprocessed_stack p 
                ON e.hash = p.resized 
                    OR e.hash = p.lcned 
                    OR e.hash = p.sharpened 
            WHERE filepath is not null AND 
                p.animal_id={key['animal_id']} 
                AND p.stack_idx={key['stack_idx']} 
                AND p.session={key['session']}
                AND p.volume_id={key['volume_id']}
                AND p.channel={key['channel']}""")
        
        stacks = cursor.fetchall()
        
        #find_paths has the logic to determine which of the results from the join corresponds to which
        # stack. Since it's a join this is needed
        paths = find_paths(stacks)
        
        resized_hash = paths['resized']['filepath']
        lcned_hash = paths['lcned']['filepath']
        sharpened_hash = paths['sharpened']['filepath']
        
        #### START PIPELINE CODE 
        # Load stack
        folder = dj.config['stores']['stack']['location']

        stack = (CorrectedStack() & key).get_stack(key['channel'])

        # Resize to be 1 um^3

        um_sizes = (CorrectedStack & key).fetch1('um_depth', 'um_height', 'um_width')
        resized = registration.resize(stack, um_sizes, desired_res=1)

        # Enhance
        lcned = enhancement.lcn(resized, (3, 25, 25))


        # Sharpen
        sharpened = enhancement.sharpen_2pimage(lcned, 1)
        
        ### END PIPELINE CODE
        
        # need to pack the arrays the same way datajoint does so on a dj fetch it returns the right result
        resized_blob = pack(resized)
        lcned_blob = pack(lcned)
        sharpened_blob = pack(sharpened)

        
        
        # building all the paths
        resized_path = os.path.join(folder,resized_hash)
        lcned_path = os.path.join(folder,lcned_hash)
        sharpened_path = os.path.join(folder,sharpened_hash)
        
        # bypassing the safe_write allows it to overwrite the existing error message file
        print(resized_path)
        Path(resized_path).write_bytes(resized_blob)
        # safe_write(resized_path,resized_blob)
        print(lcned_path)
        Path(lcned_path).write_bytes(lcned_blob)
        # safe_write(lcned_path,lcned_blob)
        print(sharpened_path)
        Path(sharpened_path).write_bytes(sharpened_blob)
        # safe_write(sharpened_path,sharpened_blob)


def find_paths(results):
    paths = {}
    for stack in results:
        
        if(stack['hash'] == stack['resized']):
            paths['resized'] = stack
        elif (stack['hash'] == stack['lcned']):
            paths['lcned'] = stack
        elif (stack['hash'] == stack['sharpened']) :
            paths['sharpened'] = stack
        
    return paths
    
    