In [1]:
import os 
from os.path import join, basename
from glob import glob
import joblib
from collections import ChainMap
import traceback

class DateTimeIterator:
    def __init__(self, base_path='/work/mflora/SummaryFiles'):
        self.base_path = base_path 
        self.base_wrfout_path = '/work/wof/realtime/FCST/'
        
        dates = self.get_date_dirs()
        
        self._paths = []
        for d in dates:
            path = join(self.base_path, d)
            init_times = self.get_init_time_dirs(d)
            paths = [join(path, t) for t in init_times] 
            for p in paths:
                self._paths.append(p) 
        
        self.index=0
    
    def get_date_dirs(self):
        return [d for d in os.listdir(self.base_path) if 'txt' not in d]
    
    def get_init_time_dirs(self, date):
        return [t for t in os.listdir(join(self.base_path, date)) if 'base' not in t] 
    
    def __iter__(self):
        return self
    
    def __next__(self):
        try:
            result = self._paths[self.index]
        except IndexError:
            raise StopIteration
        self.index+=1
        return result


class SummaryFileValidater:
    """
    SummaryFileValidater checks for empty directories, missing summary files, 
    and corrupted/missing WRFOUT files. The first task is to determine the number of WRFOUT
    files per date and initialization time. In this first task, we determine  
    corrupted files as situations where the number of files are inconsistent for the 
    different ensemble members (At the moment, in cases where the file exists, 
    this code does not explicitly check whether files are truly corrupted; i.e., unopenable).
    In the case that the WRFOUT path is empty, we know that the corresponding summary file path 
    is also empty and therefore it can be deleted. 
    As for missing files, we assess when the summary file count (for a given summary file type)
    differs from the WRFOUT count. 
    
    Parameters
    --------------
    base_path : path-like 
    
    n_jobs : int 
        Number of processors for parallelization. 
    
    Attributes
    --------------------
    empty_paths : list of paths 
        Empty directories 
        
    missing_files : list of paths 
        Missing summary files. 
        
    corrupted_files : list of paths 
        Corrupted WRFOUT files. 
    """
    SUMMARY_FILE_TYPES = ['ENS', 'SVR', 'ENV', '30M', '60M', 'SND', 'SWT']
    
    RLT_DATES = ['20170509', 
                 '20170517',
                 '20170527',
                 '20170518',
                 '20170516',
                 '20170523']
    
    def __init__(self, base_path = '/work/mflora/SummaryFiles', n_jobs=30 ):
        self.n_jobs = n_jobs
        self.base_path = base_path
        self._empty_paths = []
        self._missing_files = {t : [] for t in self.SUMMARY_FILE_TYPES}  
        self._corrupted_files = [] 
    
    def __call__(self):
        # For a summary file path, get the count of WRFOUT files.
        # When the corresponding WRFOUT file path is empty, then
        # we know the summary file path is empty and thus it can be 
        # removed. We can also use to determine if there are missing files. 
        self.wrf_file_count = self.get_num_wrf_files()
        self._remove_empty_paths()
        # In case not all empty paths are removed, 
        # we need to keep track of those paths. 
        self._find_empty_paths
        self._find_missing_files()
        
    @property
    def empty_paths(self):
        """The empty_paths property."""
        return self._empty_paths
    
    @property
    def missing_files(self):
        """The missing files property."""
        return self._missing_files
    
    @property
    def corrupted_files(self):
        """The corrupted files property."""
        return self._corrupted_files
    
    def _appender(self, attr, paths, file_type=None):
        if isinstance(paths, list):
            if file_type is not None:
                attr[file_type].extend(paths)
            else:
                attr.extend(paths)
        else:
            if file_type is not None:
                attr[file_type].append(paths)
            else:
                attr.append(paths)
    
    @empty_paths.setter
    def empty_paths(self, paths):
        self._appender(self._empty_paths, paths)
    
    @missing_files.setter
    def missing_files(self, arg):
        self._appender(self._missing_files, arg[0], arg[1])
    
    @corrupted_files.setter
    def corrupted_files(self, paths):
        self._appender(self._corrupted_files, paths)
    
    def _is_dir_empty(self, path):
        return not any(os.scandir(path))

    def _find_empty_paths(self,):
        """Find empty directories"""
        iterator = DateTimeIterator()
        
        def worker(path):
            if self._is_dir_empty(path):
                return path 
            return None
    
        results = joblib.Parallel(n_jobs = self.n_jobs,
                        backend='loky',
                        verbose=0)(joblib.delayed(worker)(path) for path in iterator)
        
        for path in results:
            if path is not None:
                self.empty_paths = path
    
    def _remove_empty_paths(self,):
        """ Remove empty directories that are not associated with 
        missing and/or corrupt files"""
        wrf_file_count = self.get_num_wrf_files()
        for path, count in wrf_file_count.items():
            # The WRFOUTs for this date and init time are 
            # empty, so the path can be removed because 
            # it is empty as well. 
            if count == 0: 
                try:
                    os.rmdir(path)
                except Exception:
                    print(f'Unable to remove {path}. Likely permission issues or was not an empty dir!')
                    print(traceback.format_exc())
        
    def _find_missing_files(self,): 
        """ Find the missing files. """
        iterator = DateTimeIterator()
        
        def worker(path, file_type): 
            # The path is empty.
            if path in self.empty_paths:
                return None
            # Another check of an empty path. 
            elif self.wrf_file_count[path] == 0:
                return None 
            # The WRFOUTs are inconsistent amongst the ensemble members
            elif self.wrf_file_count[path] == -1:
                return None
            
            files = glob(join(path, f'wofs_{file_type}_*'))
            if not self._check_file_count(path, file_type, files):
                # Check that the number of files is correct based 
                # on the WRFOUTs.
                return path
            return None  
        
        for file_type in self.SUMMARY_FILE_TYPES:
        
            paths = joblib.Parallel(n_jobs = self.n_jobs,
                        backend='loky',
                        verbose=0)(joblib.delayed(worker)(path, file_type) for path in iterator)
        
            for path in paths:
                if path is not None:
                    self.missing_files = (path, file_type) 
        
    def _check_file_count(self, path, file_type, files):
        """Check the correct of summary files within a given directory exists.
        For half-hour (top-of-the-hour) forecasts, there should 36 (72) files """
        n_files = len(files)
        true_count = self.wrf_file_count[path ]
        
        if file_type == '60M':
            return true_count == int(true_count/12)
        elif file_type == '30M':
            return n_files in [true_count - (6+1), true_count - 6] 
        else:
            return n_files == true_count 
        
    def get_num_wrf_files(self,):
        """For each date and initialization time, build a dictionary 
        determining if the number of forecast outputs per ensemble members is consistent
        and storing that number """
        iterator = DateTimeIterator()
        
        def worker(path):
            wrf_file_count = {}
            path = os.path.normpath(path)
            date, init_time = path.split(os.sep)[-2:]
            year = date[:4]
            
            if date in os.listdir('/work/nusrat.yussouf/HMT/FCST/'):
                _basePath = join('/work/nusrat.yussouf/HMT/FCST/', date, init_time)
            elif date in os.listdir('/scratch/wof/realtime/FCST/compressed/FCST/'):
                _basePath = join('/scratch/wof/realtime/FCST/compressed/FCST/', date, init_time)
            else:
                if date in self.RLT_DATES:
                    _basePath = join('/work/wof/realtime/FCST/', year, date, 'RLT', init_time)
                else:
                    _basePath = join('/work/wof/realtime/FCST/', year, date, init_time)
        
            paths = glob(join(_basePath, 'ENS_MEM*'))
            
            
            if int(year) < 2019:
                lens = [len(glob(join(p,'wrfout*'))) for p in paths]
            else:
                lens = [len(glob(join(p,'wrfwof*'))) for p in paths]
            
            # Check that each ensemble member has the same number of forecast outputs. 
            if len(lens) > 0:
                fcst_length_consistency =  lens.count(lens[0]) == len(lens)
                if fcst_length_consistency:
                    wrf_file_count[path] = lens[0]
                else:
                    wrf_file_count[path] = -1
            else:
                wrf_file_count[path] = 0
            
            return wrf_file_count
        
        wrf_file_count = joblib.Parallel(n_jobs = self.n_jobs,
                        backend='loky',
                        verbose=0)(joblib.delayed(worker)(path) for path in iterator)
        
        # Turn list of dicts into a single dict. 
        wrf_file_count = dict(ChainMap(*wrf_file_count))

        return wrf_file_count 
    
    def _find_corrupted_files(self,):
        pass
        #TODO: Actually determine corrupt WRFOUT files 
        # by checking if they can be opened or if they 
        # are much smaller in size. 

In [2]:
# Find the missing files. 
validater = SummaryFileValidater()
validater()

In [3]:
import pickle
with open('summary_file_validater.pkl', 'wb') as f:
    pickle.dump(validater, f,)

In [4]:
with open('summary_file_validater.pkl', 'rb') as f:
    validater = pickle.load(f)
    
validater.missing_files

{'ENS': ['/work/mflora/SummaryFiles/20180620/1830',
  '/work/mflora/SummaryFiles/20170608/2300',
  '/work/mflora/SummaryFiles/20170608/0200',
  '/work/mflora/SummaryFiles/20170608/1900',
  '/work/mflora/SummaryFiles/20170608/0300',
  '/work/mflora/SummaryFiles/20170608/2200',
  '/work/mflora/SummaryFiles/20170608/2000',
  '/work/mflora/SummaryFiles/20170608/0100',
  '/work/mflora/SummaryFiles/20170608/0000',
  '/work/mflora/SummaryFiles/20170608/2130',
  '/work/mflora/SummaryFiles/20170608/0030',
  '/work/mflora/SummaryFiles/20170608/0130',
  '/work/mflora/SummaryFiles/20170608/2030',
  '/work/mflora/SummaryFiles/20170608/2230',
  '/work/mflora/SummaryFiles/20170608/0230',
  '/work/mflora/SummaryFiles/20170608/2330',
  '/work/mflora/SummaryFiles/20180618/1830',
  '/work/mflora/SummaryFiles/20180712/1830',
  '/work/mflora/SummaryFiles/20180718/1830',
  '/work/mflora/SummaryFiles/20170605/0030',
  '/work/mflora/SummaryFiles/20170605/2130',
  '/work/mflora/SummaryFiles/20170605/2030',
  '