In [1]:
import re
import os
import glob
import fnmatch
from tqdm.notebook import tqdm, tqdm_notebook

import numpy as np
import time

import dask
import dask.array as da
from dask.distributed import Client, progress, get_task_stream, as_completed
dask.config.set({'logging.distributed': 'error'})
from IPython.display import IFrame
from tqdm.notebook import tqdm

from copy import deepcopy

from SIFT_gs.FIBSEM_SIFT_gs import FIBSEM_frame

skimage version:  0.19.2
scipy version:    1.9.1
sklearn version:  1.0.2
Open CV version:  4.10.0
SIFT_gs version:  4.0.1


In [2]:
def del_file(filename):
    try:
        os.remove(filename)
        del_succesfull = True
    except:
        del_succesfull = False
    return filename, del_succesfull

def get_size_gs(filename):
    try:
        x = np.int64(os.path.getsize(filename))
    except:
        x = np.int64(0)
    return filename, x

def get_sizes_gs(filenames, **kwargs):

    DASK_client = kwargs.get('DASK_client', '')
    DASK_client_retries = kwargs.get('DASK_client_retries', 3)
    max_futures = kwargs.get('max_futures', 100000)
    try:
        client_services = DASK_client.scheduler_info()['services']
        if client_services:
            try:
                dport = client_services['dashboard']
            except:
                dport = client_services['bokeh']
            status_update_address = 'http://localhost:{:d}/status'.format(dport)
            print('DASK client exists. Will perform distributed computations')
            print('Use ' + status_update_address +' to monitor DASK progress')
            use_DASK = True
        else:
            print(time.strftime('%Y/%m/%d  %H:%M:%S')+'   DASK client does not exist. Will perform local computations')
            use_DASK = False
    except:
        print(time.strftime('%Y/%m/%d  %H:%M:%S')+'   DASK client does not exist. Will perform local computations')
        use_DASK = False

    if use_DASK:
        filenames = []
        filesizes = []
        DASK_batch = 0
        while len(filenames) > max_futures:
            print(time.strftime('%Y/%m/%d  %H:%M:%S')+'   Starting DASK batch {:d} with {:d} jobs, {:d} jobs remaining'.format(DASK_batch, max_futures, (len(filenames)-max_futures)))
            futures = DASK_client.map(get_size_gs, filenames[0:max_futures])
            filenames = filenames[max_futures:]
            results = DASK_client.gather(futures)
            DASK_client.cancel(futures)
            filenames += [result[0] for result in results]
            filesizes += [result[1] for result in results]
        
        if len(full_filenames_Jeiss3_copy) > 0:
            print(time.strftime('%Y/%m/%d  %H:%M:%S')+'   Starting DASK batch {:d} with {:d} jobs'.format(DASK_batch, len(filenames)))
            futures = DASK_client.map(get_size_gs, filenames)
            filenames = filenames[max_futures:]
            results = DASK_client.gather(futures)
            DASK_client.cancel(futures)
            filenames += [result[0] for result in results]
            filesizes += [result[1] for result in results]
    else:
        filenames = ['' for x in np.arange(len(filenames))]
        filesizes = np.zeros(len(filenames), dtype = np.int64)
        for j, filename in enumerate(tqdm(filenames)):
            filenames[j], filesizes[j] = get_size_gs(filename)

    return filenames, filesizes

In [3]:
#start / restart client
try:
    client.restart()
except:
    client = Client()
    
# setup a window to monitor the client progress
try:
    dport = client.scheduler_info()['services']['dashboard']
except:
    dport = client.scheduler_info()['services']['bokeh']
print('Using Local Port:   ', dport)
status_update_address0 = 'http://localhost:{:d}/status'.format(dport)
print('Use ' + status_update_address0 +' to monitor DASK progress')
status_update_address = 'http://localhost:{:d}/status'.format(dport)
IFrame(src=status_update_address, width='100%', height='900px')

Using Local Port:    8787
Use http://localhost:8787/status to monitor DASK progress


# Analyze Directory \nearline4\fibsem\Backup_data

In [4]:
%%time

root_dir_Backup_data = 'V:/Backup_data'

full_filenames_Backup_data = [os.path.join(d, x) for d, dirs, files in os.walk(root_dir_Backup_data) for x in files]
print('Root directory:', root_dir_Backup_data)
num_tot = len(full_filenames_Backup_data)
print('Total number of files in the root directory:      ', num_tot)
full_PNG_filenames = fnmatch.filter(full_filenames_Backup_data, '*.png')
num_PNG = len(full_PNG_filenames)
full_CSV_files = fnmatch.filter(full_filenames_Backup_data, '*.csv')
num_CSV = len(full_CSV_files)
full_XLSX_files = fnmatch.filter(full_filenames_Backup_data, '*.xlsx')
num_XLSX = len(full_XLSX_files)

print('Total number of *.png files in the root directory:', num_PNG)
print('Total number of *.csv files in the root directory:', num_CSV)
print('Total number of *.xlsx files in the root directory:', num_XLSX)

print('Total number of files to copy to tape:      ', (num_tot - num_PNG))
print('Total number of files to delete:      ', (num_tot - (num_PNG + num_CSV + num_XLSX)))
print('Total number of files to keep  :      ', num_PNG + num_CSV + num_XLSX)

except_masks = ['png', 'csv', 'xlsx']
filenames_to_delete_Backup_data = [file for file in full_filenames_Backup_data if np.array([file [-1*(len(except_mask)):] != except_mask for except_mask in except_masks]).all()]
print('Total number of files to delete (alt):', len(filenames_to_delete_Backup_data))

Root directory: V:/Backup_data
Total number of files in the root directory:       754569
Total number of *.png files in the root directory: 211764
Total number of *.csv files in the root directory: 299
Total number of *.xlsx files in the root directory: 18
Total number of files to copy to tape:       542805
Total number of files to delete:       542488
Total number of files to keep  :       212081
Total number of files to delete (alt): 542488
Wall time: 1min 52s


In [7]:
%%time
print('Using DASK. Use ' + status_update_address0 +' to monitor DASK progress')
futures = client.map(get_size_gs, full_filenames_Backup_data, retries = 5)
results = np.array(client.gather(futures))
filenames_Backup_data = [result[0] for result in results]
filesizes_Backup_data = [result[1] for result in results]

print('Root directory:', root_dir_Backup_data)
print('Total number of analyzed files in the root directory:        ', len(filenames_Backup_data))
print('Total size on disk of analyzed files in the root directory:  ', np.sum(np.int64(filesizes_Backup_data)))

Root directory: V:/Backup_data
Total number of analyzed files in the root directory:         754569
Total size on disk of analyzed files in the root directory:   174019315816137
