In [5]:
import numpy as np
import requests
from bs4 import BeautifulSoup
import re
import sys, os
import time
import datetime
from subprocess import Popen, check_output
import h5py
from progress.bar import Bar
from progress.spinner import Spinner
from wsmtk.utils import *
from contextlib import contextmanager, closing
import warnings
import itertools
import bisect
import gc
import array
import multiprocessing as mp
import traceback
try:
    import gdal
except ImportError:
    from osgeo import gdal


  from ._conv import register_converters as _register_converters


IndentationError: unexpected unindent (utils.py, line 129)

In [None]:
class MODISrawh5:
    '''Class for raw MODIS data collected into HDF5 file, ready for smoothing.

    For MOD/MYD 13 products, MOD and MYD are interleaved into a combined MXD.
    '''

    def __init__(self,files,param=None,targetdir=os.getcwd(),compression='gzip',chunk=120*120):
        
        '''Create a MODISrawh5 class

        Args:
            files ([str]): A list of absolute paths to MODIS raw hdf files to be processed
            param (str): VAM parameter to be processed (default VIM/LTD)
            targetdir (str): Target directory for raw MODIS HDF5 file
            compression (str): Compression method to be used (default = gzip)
        '''

        self.targetdir = targetdir
        #self.resdict = dict(zip(['250m','500m','1km','0.05_Deg'],[x/112000 for x in [250,500,1000,5600]])) ## commented for original resolution
        self.paramdict = dict(zip(['VIM', 'VEM', 'LTD', 'LTN'], ['NDVI', 'EVI', 'LST_Day', 'LST_Night']))
        self.compression = compression
        self.dts_regexp = re.compile(r'.+A(\d{7}).+')
        self.dates = [re.findall(self.dts_regexp,x)[0] for x in files]
        self.chunk = chunk
        self.files = [x for (y,x) in sorted(zip(self.dates,files))]
        self.dates.sort()
        self.nfiles = len(self.files)
        self.ref_file = self.files[0]
        self.ref_file_basename = os.path.basename(self.ref_file)

        # class works only for M.D11 and M.D13 products
        if not re.match(r'M.D13\w\d',self.ref_file_basename) and not re.match(r'M.D11\w\d',self.ref_file_basename):
            
            raise SystemExit("Processing only implemented for M*D11 or M*13 products!")

        # Patterns for string extraction
        ppatt = re.compile(r'M\w{6}')
        vpatt = re.compile('.+\.(\d{3})\..+')
        tpatt = re.compile(r'h\d+v\d+')

        # Open reference file
        ref = gdal.Open(self.ref_file)

        # When no parameter is selected, the default is VIM and LTD
        if not param:
            ref_sds = ref_sds = [x[0] for x in ref.GetSubDatasets() if self.paramdict['VIM'] in x[0] or self.paramdict['LTD'] in x[0]][0]
            self.param = [key for key, value in self.paramdict.items() if value in ref_sds][0]
            ref_sds = None
        elif param in self.paramdict.keys():
            self.param = param
        else:
            raise ValueError('Parameter string not recognized. Available parameters are %s.' % [x for x in self.paramdict.keys()])

        ref = None

        # check for MOD/MYD interleaving
        if self.param is 'VIM' and any(['MOD' in os.path.basename(x) for x in files]) and any(['MYD' in os.path.basename(x) for x in files]):
            self.product = [re.sub(r'M[O|Y]D','MXD',re.findall(ppatt,self.ref_file_basename)[0])]
            self.temporalresolution = 8
        else:
            self.product = re.findall(ppatt,self.ref_file_basename)
            self.temporalresolution = None

        # Name of file to be created/updated
        self.outname = '{}/{}/{}.{}.h5'.format(
                                    self.targetdir,
                                    self.param,
                                    '.'.join(self.product + re.findall(tpatt,self.ref_file_basename) + [re.sub(vpatt,'\\1',self.ref_file_basename)]),
                                    self.param)

        self.exists = os.path.isfile(self.outname)
        ref = None


    def create(self):
        '''Creates the HDF5 file.'''

        print('\nCreating file: {} ... '.format(self.outname), end='')

        ref = gdal.Open(self.ref_file)
        ref_sds = [x[0] for x in ref.GetSubDatasets() if self.paramdict[self.param] in x[0]][0]

        rst = gdal.Open(ref_sds)
        ref_sds = None

        self.rows = rst.RasterYSize
        self.cols = rst.RasterXSize
        self.nodata_value = int(rst.GetMetadataItem('_FillValue'))

        if re.match(r'M[O|Y]D13\w\d',self.ref_file_basename):
            if not self.temporalresolution:
                self.numberofdays = 16
                self.temporalresolution = self.numberofdays

            else:
                self.numberofdays = 16

        elif re.match(r'M[O|Y]D11\w\d',self.ref_file_basename):
            self.numberofdays = 8
            self.temporalresolution = self.numberofdays

        # Read datatype
        dt = rst.GetRasterBand(1).DataType

        # Parse datatype - on error use default Int16
        try:
            self.datatype = dtype_GDNP(dt)
        except IndexError:
            print("\n\n Couldn't read data type from dataset. Using default Int16!\n")
            self.datatype = (3,'int16')

        self.chunks = (self.chunk,100)

        trans = rst.GetGeoTransform()
        prj = rst.GetProjection()

        rst = None

        # Create directory if necessary
        if not os.path.exists(os.path.dirname(self.outname)):
            os.makedirs(os.path.dirname(self.outname))

        # Create HDF5 file
        try:

            with h5py.File(self.outname,'x',libver='latest') as h5f:
                dset = h5f.create_dataset('data',shape=(self.rows*self.cols,self.nfiles),dtype=self.datatype[1],maxshape=(self.rows*self.cols,None),chunks=self.chunks,compression=self.compression,fillvalue=self.nodata_value)
                h5f.create_dataset('dates',shape=(self.nfiles,),maxshape=(None,),dtype='S8',compression=self.compression)
                dset.attrs['geotransform'] = trans
                dset.attrs['projection'] = prj
                dset.attrs['resolution'] = trans[1] # res ## commented for original resolution
                dset.attrs['nodata'] = self.nodata_value
                dset.attrs['numberofdays'] = self.numberofdays
                dset.attrs['temporalresolution'] = self.temporalresolution
                dset.attrs['rows'] = self.rows
                dset.attrs['columns'] = self.cols

            self.exists = True
            print('done.\n')

        except:
            print('\n\nError creating {}! Check input parameters (especially if compression/filter is available) and try again. Corrupt file will be removed now. \n\nError message: \n'.format(self.outname))
            os.remove(self.outname)
            raise

    def update(self):
        '''Ingest raw data into MODIS raw HDF5 file.

        When a new HDF5 file is created, uodate will also handle the first data ingest.
        '''

        print('Processing MODIS files ...\n')

        try:

            print("READ METADATA")
            with h5py.File(self.outname,'r+',libver='latest') as h5f:
                dset = h5f.get('data')
                dts  = h5f.get('dates')
                self.chunks = dset.chunks
                self.rows = dset.attrs['rows'].item()
                self.cols = dset.attrs['columns'].item()
                self.nodata_value = dset.attrs['nodata'].item()
                self.numberofdays = dset.attrs['numberofdays'].item()
                self.temporalresolution = dset.attrs['temporalresolution'].item()
                self.datatype = dtype_GDNP(dset.dtype.name)
                #res  = dset.attrs['Resolution'] ## comment for original resolution
                dset.attrs['processingtimestamp'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

                # Insert new dates into existing date list
                dates_combined = [x.decode() for x in dts[...] if len(x) > 0 and x.decode() not in self.dates]

                #dates_combined = np.concatenate([dates_h5,self.dates])
                
                

                n = len(dates_combined)

                # if date list is bigger after insert, datasets need to be resized for additional data
                if n > dts.shape[0]:
                    dts.resize((n,))
                    dset.resize((dset.shape[0],n))

                sort_ix = np.argsort(dates_combined)

                # Write back date list
                #dts[...] = [n.encode("ascii", "ignore") for n in dates]

                # Manual garbage collect to prevent out of memory
                [gc.collect() for x in range(3)]

                parallel = True

                
                print("READY!")
            
                if parallel:

                    shared_arr = init_shared(self.chunks[0] * n)
                    
                    arr = tonumpyarray(shared_arr)

                    arr.shape = (self.chunks[0], n)
                    
                    ysize = int(self.chunks[0]/self.rows)

                    for b in range(0,dset.shape[0],self.chunks[0]):
                        
                        print(b)
                        
                        yoff = int(b/self.rows)
                        
                        for b1 in range(0,n,self.chunks[1]):
                            
                            arr[..., b1:b1+self.chunks[1]] = dset[b:b+self.chunks[0], b1:b1+self.chunks[1]]
                            
                        del b1

                        with closing(mp.Pool(processes=8,initializer = init, initargs = (shared_arr,arr.shape,self.files))) as pool:
                            
                            res = pool.map(wload,[(0,yoff,2400,ysize,fix,dates_combined.index(self.dates[fix])) for fix in range(0,self.nfiles)])
                            
                        

                        pool.close()
                        pool.join()

                        # sort

                        arr[...] = arr[...,sort_ix]

                        # write back
                        for b1 in range(0,n,self.chunks[1]):
                            dset[b:(b+self.chunks[0]), b1:(b1+self.chunks[1])] = arr[..., b1:(b1+self.chunks[1])]
                else:
                    
                    t3 = time.time()
                    
                    file_handles = []
                    
                    arr = np.zeros((self.chunks[0],self.nfiles),dtype=self.datatype[1])
                    
                    print('Opening files ...')
                    
                    t1 = time.time()

                    for ix,fl in enumerate(self.files):
                        
                        t2 = time.time()
                        
                        fl_o = gdal.Open(fl)

                        val_sds = [x[0] for x in fl_o.GetSubDatasets() if self.paramdict[self.param] in x[0]][0]

                        file_handles.append(gdal.Open(val_sds))

                        fl_o = None
                        
                        if ix%100 == 0:
                            print('File {} of {}. Elapsed {}.'.format(ix,self.nfiles,time.time()-t2))
                            
                    print("Done. Elapsed: {}".format(time.time() - t1))

                    ysize = self.chunks[0]//self.rows
                    
                    print('Writing data ...')
                    
                    bix = 0
                    nblocks = dset.shape[0]//self.chunks[0]

                    for b in range(0, dset.shape[0], self.chunks[0]):
                        bix += 1
                        t1 = time.time()

                        yoff = b//self.rows

                        for fix,f in enumerate(self.files):

                            dset[b:b+self.chunks[0], dates_combined.index(self.dates[fix])] = file_handles[fix].ReadAsArray(xoff=0,yoff=yoff,xsize=self.rows,ysize=ysize).flatten()

                        if bix%10==0:
                            print("block {} of {}. Elapsed: {}".format(bix,nblocks,time.time()-t1))
                            
                    for fix in range(len(file_handles)):
                        file_handles[fix] = None

                    del file_handles
                    
                    print("DONE! Total time: {}".format(time.time()-t3))
                

        except:
        
                raise

In [None]:
def init_parameters(**kwargs):
    '''Initialize parameters for processing in workers.'''

    params = dict()

    for key, value in kwargs.items():
        params[key] = value
    return params


def init(arr_,dim,files_):
    
    global arr
    global files    
    
    files = files_
    arr = tonumpyarray(arr_)
    arr.shape = dim
    
def wload(ix):
    
    xoff, yoff, xsize, ysize, fix, aix = ix
    try:
        ds = gdal.Open(files[fix])
        sds_o = gdal.Open(ds.GetSubDatasets()[0][0])
    
        arr[...,aix] =  sds_o.ReadAsArray(xoff,yoff,xsize,ysize).flatten()
        ds = None
        sds_o = None
    except AttributeError:
        print(files[fix])
    

In [12]:
import glob
import shutil

In [None]:
fls = glob.glob('/data/h5py_test/*hdf')

In [None]:
h5 = MODISrawh5(fls,chunk=480**2)


In [None]:
h5.create()

In [None]:
h5.update()

In [13]:
shutil.os.remove('/home/jovyan/VIM/MXD13A1.h17v07.006.VIM.h5')

In [None]:
t3 = time.time()
                    
file_handles = []
                    
arr = np.zeros((h5.chunk,len(fls)),dtype='int16')
                    
print('Opening files ...')
                    
t1 = time.time()

for ix,fl in enumerate(h5.files):
    
    t2 = time.time()
    
    fl_o = gdal.Open(fl)

    val_sds = fl_o.GetSubDatasets()[0][0]

    file_handles.append(gdal.Open(val_sds))

    fl_o = None
                        
    if ix%100 == 0:
        print('File {} of {}. Elapsed {}.'.format(ix,len(fls),time.time()-t2))
                            
print("Done. Elapsed: {}".format(time.time() - t1))

In [None]:
n = len(fls)

with h5py.File(h5.outname) as h5f:

    dset = h5f.get("data")
    print(dset.chunks)
    

    ysize = h5.chunk//h5.rows
                    
    print('Writing data ...')
        
    ix = np.arange(len(fls))

    bix = 0

    nblocks = dset.shape[0]//h5.chunk

    for b in range(0, dset.shape[0], dset.chunks[0]):
        bix += 1
        t1 = time.time()
        yoff = b//h5.rows
        
        for b1 in range(0, n, dset.chunks[1]):
            
            arr[...,b1:b1+dset.chunks[1]] = dset[b:b+dset.chunks[0],b1:b1+dset.chunks[1]]
        

        for fix,f in enumerate(h5.files):
            arr[...,h5.dates.index(h5.dates[fix])] = file_handles[fix].ReadAsArray(xoff=0,yoff=yoff,xsize=h5.rows,ysize=ysize).flatten()
        
        arr = arr[...,ix]
        
        for b1 in range(0, n, dset.chunks[1]):
            
            dset[b:b+dset.chunks[0],b1:b1+dset.chunks[1]] =  arr[...,b1:b1+dset.chunks[1]] 
        
        if bix%10==0:
            
            print("block {} of {}. Elapsed: {}".format(bix,nblocks,time.time()-t1))
            
        
        print("block {} of {}. Elapsed: {}".format(bix,nblocks,time.time()-t1))
        if bix == 5:
            break
                            

# for fix in range(len(file_handles)):
#     file_handles[fix] = None

# del file_handles
                    
print("DONE! Total time: {}".format(time.time()-t3))
                

In [None]:
(100*25)/60

In [None]:
class handler:
    def __init__(self,x):
        self.n = []
        for ii in range(x):
            self.n.append(ii)
            
        
    

In [None]:
h = handler(10)

In [None]:
for fix in range(len(file_handles)):
    file_handles[fix] = None

In [None]:
del file_handles

In [2]:

a = None

In [8]:
import gdal

In [9]:
ds = gdal.Open("test.tif")

In [10]:
ds.GetSubDatasets()

AttributeError: 'NoneType' object has no attribute 'GetSubDatasets'