# Install requirements

The cloud dataflow VMs don't have gdal installed, and it's a bit of a nightmare to do.
First install the system binaries for it

In [None]:
!apt-get update

In [None]:
!apt-get --assume-yes install gdal-bin libgdal-dev python3-dev

In [5]:
!gdal-config --version

2.2.3


Only then can we successfully pip-install the python GDAL bindings. It needs two environment variables to point it at the system binaries. Then install the same version that apt-get installed of the binaries.

In [1]:
import os
os.environ['CPLUS_INCLUDE_PATH']="/usr/include/gdal"
os.environ['C_INCLUDE_PATH']="/usr/include/gdal"

In [None]:
!pip install gdal==2.2.3

In [7]:
!python -c "from osgeo import gdal;print(gdal.__file__)"

/opt/conda/lib/python3.7/site-packages/osgeo/gdal.py


In [8]:
!python -c "import sys; print(sys.executable)"

/opt/conda/bin/python


In [11]:
import sys, os

In [12]:
print(sys.executable)

/root/apache-beam-custom/bin/python


We just installed it into whatever python gets called at a shell python command, which looks like it is a conda root environment? That's not the same one as is running this jupyterlab notebook. Don't really understand what the correct way is to install a new module into jupyterlab's environment, but whatever, just hacking it seems to work for now:

In [39]:
sys.path.append('/opt/conda/lib/python3.7/site-packages')

In [14]:
from osgeo import gdal

In [15]:
from retrying import retry

End of setup, time to move on

In [16]:
import re
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.io.gcp.gcsio import GcsIO
import tempfile


In [17]:
from getpass import getpass

In [18]:
username='harrygibson'
password=getpass()
YEAR_FROM = 2019
YEAR_TO = 2020
DOY_START = 20
DOY_END = 40
TILE = '*'
BASE_URL = "http://e4ftl01.cr.usgs.gov"
platform = "MOLT"
product = "MOD11A2.006"
product_url = f"{BASE_URL}/{platform}/{product}"
product_url
LAYER_TEMPLATE = 'HDF4_EOS:EOS_GRID:"{}":MODIS_Grid_8Day_1km_LST:LST_Day_1km'
VAR_NAME = "LST_Day"

# or 'HDF4_EOS:EOS_GRID:"{}":MODIS_Grid_8Day_1km_LST:LST_Night_1km'

 ·········


In [38]:
BUCKET_PATH = "gs://hsg-dataflow-test/lst_download_dev"
HDF_BUCKET_PATH = BUCKET_PATH + '/hdf-notebook' #NB not os.path.join as that breaks on windows with backslash paths
OUTPUT_BUCKET_PATH = BUCKET_PATH + "/output-notebook"

In [None]:
# 8-daily dates for use when the product is daily but we only want 8-daily:
# days = ['001','009','017','025','033','041','049','057','065','073','081','089','097','105','113','121',
#         '129','137','145','153','161','169','177','185','193','201','209','217','225','233','241','249',
#         '257','265','273','281','289','297','305','313','321','329','337','345','353','361']
# BRDF product:
# https://e4ftl01.cr.usgs.gov/MOTA/MCD43D62.006/ thru MCD43D68
#  covars = {}
# covars['EVI'] = "((((B2 - B1)*_MODIS_SCALE_CONST) / ((B2 + (B1 * _EVI_C1) - (B3 * _EVI_C2))*_MODIS_SCALE_CONST + _EVI_L) * _EVI_G))"
# covars['TCB'] = "(B1*TCB0 + B2*TCB1 + B3*TCB2 + B4*TCB3 + B5*TCB4 + B6*TCB5 + B7*TCB6)*_MODIS_SCALE_CONST"
# covars['TCW'] = "(B1*TCW0 + B2*TCW1 + B3*TCW2 + B4*TCW3 + B5*TCW4 + B6*TCW5 + B7*TCW6)*_MODIS_SCALE_CONST"
 # EVI coefficients 
# _EVI_C1 = 6.0
# _EVI_C2 = 7.5
# _EVI_L = 1.0 
# _EVI_G = 2.5

# TCB coefficients 
#TCB0 = 0.4395
#TCB1 = 0.5945
#TCB2 = 0.2460
#TCB3 = 0.3918
#TCB4 = 0.3506
#TCB5 = 0.2136
#TCB6 = 0.2678

# TCW coefficients
# TCW0 = 0.1147
# TCW1 = 0.2489
# TCW2 = 0.2408
# TCW3 = 0.3132
# TCW4 = -0.3122
# TCW5 = -0.6416
# TCW6 = -0.5087

# scale conversion
## THIS CHANGED FROM V5 EVEYTHING WAS SHIT (v5 was 0.0001)
#_MODIS_SCALE_CONST = 0.001

## Functions to parse dates from the MODIS DAAC pages

These are local functions, they don't need to run within the Beam pipeline, there's only a single http fetch and then some local list processing. They're more or less taken from get_modis (https://github.com/jgomezdans/get_modis)

In [20]:
def generate_selected_dates(year_from=2000, year_to=2020, doy_start=1, doy_end=-1):
    import calendar, time
    dates = []
    for year in range(year_from, year_to+1):
        if doy_end == -1:
            if calendar.isleap(year):
                end_day = 367
            else:
                end_day = 366
        else:
            end_day = doy_end
        dates_this_yr = [time.strftime("%Y.%m.%d", time.strptime("%d/%d" % (i, year),
                                                         "%j/%Y")) for i in
                 range(doy_start, end_day)]
        dates.extend(dates_this_yr)
    return dates

In [21]:
def get_existing_files(out_dir):
    # in case we need to do something different to list files on bucket
    return os.listdir(out_dir)

def load_page_text(url):
    import requests, time
    # nasa data pools are unavailable for maintenance on wednesday afternoons
    the_day_today = time.asctime().split()[0]
    the_hour_now = int(time.asctime().split()[3].split(":")[0])
    if the_day_today == "Wed" and 14 <= the_hour_now <= 17:
        LOG.info("Sleeping for %d hours... Yawn!" % (18 - the_hour_now))
        time.sleep(60 * 60 * (18 - the_hour_now))
    resp = requests.get(url)
    return resp.text
    
def parse_modis_dates (product_url, requested_dates, product, out_dir, check_existing_dates=False ):
    """Parse returned MODIS dates.

    This function gets the dates listing for a given MODIS products, and
    extracts the dates for when data is available. Further, it crosses these
    dates with the required dates that the user has selected and returns the
    intersection. Additionally, if the `checkExistingDates` flag is set, we'll check for
    files that might already be present in the system and skip them. Note
    that if a file failed in downloading, it might still be around
    incomplete.

    Parameters
    ----------
    url: str
        A top level product URL such as "http://e4ftl01.cr.usgs.gov/MOTA/MCD45A1.005/"
    dates: list
        A list of required dates in the format "YYYY.MM.DD"
    product: str
        The product name, MOD09GA.005
    out_dir: str
        The output dir
    checkExistingDates: bool
        Whether to check for present files
    Returns
    -------
    A (sorted) list with the dates that will be downloaded.
    """
    import time
    if check_existing_dates:
        product = product_url.strip('/').split('/')[-1]
        product_no_version = product.split(".")[0]
        already_here = fnmatch.filter(get_existing_files(out_dir),
                                      "%s*hdf" % product_no_version)
        already_here_dates = [x.split(".")[-5][1:]
                              for x in already_here]

    html = load_page_text(product_url).split('\n')

    available_dates = []
    for line in html:
        if line.find("href") >= 0 and \
                        line.find("[DIR]") >= 0:
            # Points to a directory
            the_date = line.split('href="')[1].split('"')[0].strip("/")
            if check_existing_dates:
                try:
                    modis_date = time.strftime("%Y%j",
                                               time.strptime(the_date,
                                                             "%Y.%m.%d"))
                except ValueError:
                    continue
                if modis_date in already_here_dates:
                    continue
                else:
                    available_dates.append(the_date)
            else:
                available_dates.append(the_date)

    dates = set(requested_dates)
    available_dates = set(available_dates)
    suitable_dates = list(dates.intersection(available_dates))
    suitable_dates.sort()
    return suitable_dates

# Start the pipeline


### Generate a list of the dates for which data are required and available

This will be the input to our pipeline (possibly along with tiles). We will set up three test cases:
* four arbitrary dates
* all dates in one year (2019)
* all dates available

In each case we will generate a list of the string dates then make a pipeline to build them into a PCollection comprising the URLs of the page for the date

In [22]:
real_dates = generate_selected_dates(YEAR_FROM, YEAR_TO, DOY_START, DOY_END)
TEST_DATES_SPECIFIC = parse_modis_dates(product_url, real_dates, product, "C:\\temp")

all_2019_dates = generate_selected_dates(2019, 2019, 1, 366)
TEST_DATES_WHOLE_YEAR = parse_modis_dates(product_url, all_2019_dates, product, False)

all_available_dates = generate_selected_dates(2000, 2020, 1, 366)
TEST_DATES_ALL_TIME = parse_modis_dates(product_url, all_available_dates, product, False)

We'll also set up three test cases for which tiles should be downloaded:

* A block of four tiles - we'll test this for all available dates
* All tiles in Africa - we'll test this for a whole year
* All tiles globally - we'll test this for four dates only

In [23]:
TEST_TILES_FOUR = ['h17v03', 'h18v03', 'h17v04', 'h18v04']

#africa: h16:23, v5:12
import itertools
TEST_TILES_AFRICA = [f'h{pair[0]:02}v{pair[1]:02}' for pair in (itertools.product(range(16,24), range(5,13)))]

TEST_TILES_ALL = '*'

### Select which one we're doing

MODIFY THIS NEXT CELL

In [24]:
DATES_TO_TEST = [TEST_DATES_SPECIFIC, TEST_DATES_WHOLE_YEAR, TEST_DATES_ALL_TIME][0]
TILES_TO_TEST = [TEST_TILES_FOUR, TEST_TILES_AFRICA, TEST_TILES_ALL][0]

## Get the HDF urls from the date pages

For each date, we need to load the page and parse the URLs to the HDF files. Define a PTransform to do this and filter to keep only URLs that are to a tile we need.

We return a tuple of (ImageInfo, hdf_url) for each HDF tile to be downloaded. The ImageInfo keys will be duplicated across each tile for the image, i.e. this is a fanout operation. The pipeline should call reshuffle() after applying this transform to allow the many URLs to be downloaded across more machines. It's possible that we ought to do this only for the ones that need to be downloaded?

In [25]:
class PopulateHdfUrls(beam.PTransform):
                
    def load_page_text_to_lines(self, imInfo):
        import requests
        resp = requests.get(imInfo.HDF_PAGE_URL)
        lines = resp.text.split('\n')
        return [(imInfo, l) for l in lines]
        #return beam.Create(lines)
    
    def parse_hdf_from_line(self, imInfo, textline):
        if textline.find('.hdf"') != -1:
            return (imInfo, imInfo.HDF_PAGE_URL + '/' + textline.split('<a href="')[1].split('">')[0])
    
    def check_tile_is_needed(self, imInfo, url):
        import os
        thistile = os.path.basename(url).split(".")[2]
        return imInfo.TILES == "*" or thistile in imInfo.TILES
    
    def expand(self, pcoll):
        return (pcoll
                | "Load_page_lines" >> beam.FlatMap(lambda imInfo: self.load_page_text_to_lines(imInfo))
                #| "Flatten" >> beam.Flatten()
                | "discover_hdf_urls" >> beam.MapTuple(lambda imInfo, line:self.parse_hdf_from_line(imInfo, line))
                | "remove_non_matching_lines" >> beam.Filter(lambda l: l is not None)
                | "remove_non_required_tile_urls" >> beam.Filter(lambda info_url_tpl: self.check_tile_is_needed(info_url_tpl[0], info_url_tpl[1]))
               )
    

# Download the files

We have a list of HDF URLs that are needed for the given day. Now get a URL for each of them on the storage bucket. This means checking the bucket to see if we've already downloaded it (when producing another image from the same set of HDFs) and downloading it to the bucket from NASA if not. Then returning the bucket path.

Ideally this should be developed to take account of the version part of their filename i.e. note if we already have a tile but of an earlier version and do {something}. At present we just look for matching tile and date.

The PTransform class maintains an authenticated session object as a member variable as i guess this might be better than logging in for every tile.

In [26]:
class GetMissingHdfsToBucket(beam.PTransform):
    # Take a PCollection of (ImInfo, One HDF URL required for image) tuples
    # return a PCollection of (ImInfo, GCS path to HDF) tuples for all HDFs needed for this date image
    # The ImInfos are duplicated i.e. one per download url, this is so that the downloads can spread 
    # more broadly.
    # If they are already on the bucket we return the bucket path; if not we download to bucket THEN 
    # return the bucket path
    
    def __init__(self, hdf_bucket_path, nasa_user, nasa_pw):
        import requests
        gcs = GcsIO()
        self._existing_file_paths = list(gcs.list_prefix(hdf_bucket_path).keys())
        self._existing_file_names = [os.path.basename(l) for l in self._existing_file_paths]
        
        self._session = requests.Session()
        self._session.auth = (nasa_user, nasa_pw)
        self._hdf_bucket_path = hdf_bucket_path
        self._chunk_size = 8 * 1024 * 1024
        
    def bucket_path_if_existing(self, iminfo, url):
        filename = os.path.basename(url)
        matching_on_bucket = [bp for bp in self._existing_file_paths if filename in bp]
        if len(matching_on_bucket) > 0:
            return (iminfo, matching_on_bucket[0], True)
        else:
            return (iminfo, url, False)
        
    @retry(stop_max_attempt_number=7, wait_fixed=3000)
    def download_file(self, iminfo, url):
        import requests, tempfile, os
        from apache_beam.io.gcp.gcsio import GcsIO
        req = self._session.request('get', url)
        resp = self._session.get(req.url, stream=True)
        product, datestr, fname = url.split('/')[-3:]
        bucketpath = '/'.join([self._hdf_bucket_path, product, datestr, fname])
        gcs = GcsIO()
        with gcs.open(bucketpath, 'w') as fp:
        #with open(tempfilename, 'wb') as fp:
            for chunk in resp.iter_content(chunk_size=self._chunk_size):
                if chunk:
                    fp.write(chunk)
            fp.flush()
            #os.fsync(fp)
        return (iminfo, bucketpath)
    
    def expand(self, pcoll): #(iminfo, hdfurl)
        # get tuple of (iminfo, bucketpath, True) if already done, or retain (iminfo, url, False) if not
        pre_existing_bucket_paths = (pcoll | "convert_to_gcs_paths" >> beam.MapTuple(self.bucket_path_if_existing))
        # filter into a pcoll of existing and a pcoll needing download
        to_copy = pre_existing_bucket_paths | beam.Filter(lambda d: d[2]) | beam.Map(lambda t: (t[0], t[1]))
        to_download = pre_existing_bucket_paths | beam.Filter(lambda d: not d[2]) | beam.Map(lambda t: (t[0], t[1]))
        download_results = to_download | beam.MapTuple(self.download_file) 
        all_bucket_paths = (to_copy, download_results) | beam.Flatten() 
        return all_bucket_paths

## (Virtually) mosaic the files

Build a GDAL vrt for each image's tiles to mosaic them together. This requires knowledge of which layer needs to be extracted from the HDF, so that info is recorded on the ImageInfo elements.

Although GDAL can in theory read inputs straight from bucket storage (/vsigs/) this isn't necessarily built in and needs a different authentication flow so would be a lot more complex. Instead we copy the files back to worker's local storage and work on them there.

This transform needs to be called once for every output image, so after running the download step the results need to be grouped by key (ImageInfo) before running this transform.

This transform probably needs to be replaced with a DoFn (and include the subsequent tiff production one, too) to ensure that the localise/vrt/calculate/reproject steps for a single image all run on the same machine!

In [69]:
class BuildVrt(beam.PTransform):
    def __init__(self):
        import os
        from apache_beam.io.gcp.gcsio import GcsIO
        #self._hdfpath = bucketpath
        #gcs = GcsIO()
        #self._existing = [l for l in list(gcs.list_prefix(hdf_bucket_path).keys())]
            
    def get_tmp_folder_for_day(self, day):
        tmpfolder = tempfile.gettempdir()
        workfolder = os.path.join(tmpfolder, day)
        return workfolder
    
    def localise_day_files(self, iminfo):
        day = iminfo.DATE_STR
        gspaths  = iminfo.HDF_GS_PATHS
        tempfolder = self.get_tmp_folder_for_day(day)
        localpaths = []
        gcs = GcsIO()
        if not os.path.isdir(tempfolder):
            os.makedirs(tempfolder)
        for f in gspaths:
            localname = os.path.join(tempfolder, os.path.basename(f))
            if not os.path.exists(localname):
                with gcs.open(f) as gcsfile, open(localname, 'wb') as localfile:
                    localfile.write(gcsfile.read())
            localpaths.append(localname)
        iminfo.HDF_LOCAL_PATHS=localpaths
        return iminfo
    
    def build_vrt_file(self, iminfo):
        gdaltemplate = iminfo.HDF_LYR_TEMPLATE
        localpaths = iminfo.HDF_LOCAL_PATHS
        lyrpaths = [gdaltemplate.format(f) for f in localpaths]
        tempfolder = os.path.dirname(localpaths[0])
        vrtfilename = os.path.join(tempfolder, "{}.{}.vrt".format(
            iminfo.VRT_VAR_NAME, iminfo.DATE_STR))
        vrtobj = gdal.BuildVRT(vrtfilename, lyrpaths)
        vrtobj.FlushCache()
        vrtobj = None
        iminfo.VRT_LOCAL_PATH = vrtfilename
        return iminfo
        
    def expand(self, pcoll): # iminfo objects populated with gs paths
        return (pcoll | "copy_files_to_local" >> beam.Map(self.localise_day_files)
             | "build_vrts_on_local" >> beam.Map(self.build_vrt_file) 
            )


In [None]:
class CreateCalculatedOutput(beam.PTransform):
    

## Create the calculated output data

Define a PTransform to run that calculation (in fact we will move the function into the PTransform)

This will output a file to the worker's local storage, which will still be in the original sinusoidal projection and because it's only going to be read once in the next step, we use the sparse option (incompatible with some reader software) to help keep file size down


## Reproject the calculated raster

Define a PTransform to warp the sinusoidal calculated tiff into the WGS84 compressed output tiff

In [28]:
class CreateProjectedOutput(beam.PTransform):
    
    def __init__(self, ForceGlobalExtent = False):
        self._forceglobal = ForceGlobalExtent
        
    def warpfile(self, sinusFile):
        cOpts = ["TILED=YES", "BIGTIFF=YES", "COMPRESS=LZW", "NUM_THREADS=ALL_CPUS"]
        if self._forceglobal:
            wo = gdal.WarpOptions(format='GTiff', 
                          outputBounds=[-180, -90, 180, 90], 
                          xRes=1/120.0, yRes=-1/120.0, dstSRS='EPSG:4326',
                          creationOptions=cOpts, multithread=True, dstNodata=-9999, warpMemoryLimit=4096)
        else:
            wo = gdal.WarpOptions(format='GTiff', 
                          xRes=1/120.0, yRes=-1/120.0, dstSRS='EPSG:4326',
                          targetAlignedPixels="YES",
                          creationOptions=cOpts, multithread=True, dstNodata=-9999, warpMemoryLimit=4096)
            
        outname = sinusFile.replace('.sinusoidal', '')
        gdal.Warp(outname, sinusFile, options=wo)
        return outname
    
    def expand(self, pColl):
        return pColl | beam.Map(self.warpfile)


In [29]:
day_bucket_path = OUTPUT_BUCKET_PATH + '/lst_day'
night_bucket_path = OUTPUT_BUCKET_PATH + '/lst_night'

Finally make a PTransform that will upload the output back to the bucket with a mastergrids-formatted filename, then remove the temp files from the worker

In [30]:
class UploadAndClean(beam.DoFn):
    def process(self, finaltif):
        date = os.path.basename(os.path.dirname(finaltif))
        parts = os.path.basename(finaltif).split('.')
        outname = parts[0] + "_Unfilled." + parts[1] + "." + parts[2]+parts[3]+".Data.1km.Data.tif"
        if parts[0].find("Day")>0:
            gsPath = day_bucket_path + "/" + outname
        elif parts[0].find("Night")>0:
            gsPath = night_bucket_path + "/" + outname
        else:
            return
        gcs = GcsIO()
        with gcs.open(gsPath, 'w') as gcsfile, open(finaltif, 'rb') as localfile:
            gcsfile.write(localfile.read())
        os.remove(finaltif)
        yield gsPath
        

Define a class that will hold all the info needed to create a single output image and will become the "things" that comprise the PCollection that runs through the pipeline

In [174]:

class VrtImageInfo():
    # defines the information necessary to create a single virtual mosaic image consisting of 
    # one hdf page url, 
    # one date (corresponding to the given hdf page url(s))
    # one list of tiles (or '*')
    # one hdf layer template, used to build a vrt from the hdf tiles 
    #  retrieved from the hdf page url
    # variable name for the output image e.g. LST_Day, EVI
    
    def __init__(self, hdf_page_url, date, tiles, hdf_layer_template, band_name):
        # items which are fixed at creation according to the job that's being run
        self.HDF_PAGE_URL = hdf_page_url
        self.DATE_STR = date
        self.TILES = tiles
        self.HDF_LYR_TEMPLATE = hdf_layer_template
        self.VRT_VAR_NAME=band_name
        # items which will be populated along the pipeline
        self.HDF_URLS = None
        self.HDF_GS_PATHS = None
        self.HDF_LOCAL_PATHS = None
        self.VRT_LOCAL_PATH = None
        
    def as_tuple(self):
        return (self.DATE_STR, self.VRT_VAR_NAME, self.HDF_LYR_TEMPLATE, self.TILES, self.HDF_PAGE_URL)

    def __str__(self): # just override this to make it easier to ib.show for debugging
        return "{}\n{}\n{}\n|\nDownloaded: {}\nCopied: {}".format(
            self.DATE_STR, self.TILES, self.VRT_VAR_NAME, 
        self.HDF_GS_PATHS is not None, self.HDF_LOCAL_PATHS is not None)

class CalculatedImageInfo():
    # defines the information necessary to create a single calculated output image 
    # from one or more virtual input images (VrtImageInfo)
        def __init__(self, date, input_vrt_objs, calc, min_clip, max_clip, out_var_name, out_type='Float32'):
            self.DATE_STR = date
            self.INPUT_VRT_INFOS = input_vrt_objs
            #self.NAMES_IN_CALC = names_in_calc,
            self.CALC = calc
            self.MIN_CLIP_VAL = min_clip
            self.MAX_CLIP_VAL = max_clip
            self.TIF_VAR_NAME = out_var_name
            self.OUT_TYPE = out_type
            assert len(set([vInfo.DATE_STR for vInfo in input_vrt_objs]))==1
        
        def run_multiband_calculation(self, out_file):
            '''Hacked together from gdal_calc.py. Uses numexpr for slightly more efficient calculation (multithreaded).

            input_singleband_files must be a list of dataset paths pointing to single band rasters (or if they are 
            multiband, the first band will be the one that is read). For multiband rasters a .VRT file could be created 
            to select the needed band, and passed instead. For HDF files, a string specifying the layer name in the dataset 
            will be needed.
            names_in_calc must be a list of the same length as input_singleband_files, specifying the names given to each of 
            those datasets in the calc expression e.g. ["B1", "B2", "B3"]
            out_file should the output dataset name to create
            calc must be a string which will be eval'd against the input data after naming them according to the 
            terms of names_in_calc i.e. with the example above calc could be "B1 * 2.0 + B2 / 3.0 + B3"'''
            import numpy as np, numexpr as ne
            input_datasets = []
            input_bands = []
            input_datatypes = []
            input_datatype_nums = []
            input_ndvs = [] #np.empty(len(input_singleband_files))
            DimensionsCheck = None
            
            input_singleband_files = [v.VRT_LOCAL_PATH for v in self.INPUT_VRT_INFOS]
            names_in_calc = [v.VRT_VAR_NAME for v in self.INPUT_VRT_INFOS]
            calc = self.CALC
            out_type = self.OUT_TYPE
            min_clip = self.MIN_CLIP_VAL
            max_clip = self.MAX_CLIP_VAL
            
            for i, input_file in enumerate(input_singleband_files):
                ds = gdal.Open(input_file, gdal.GA_ReadOnly)
                if not ds:
                    raise IOError("Error opening input file {}".format(input_file))
                input_datasets.append(ds)
                input_datatypes.append(gdal.GetDataTypeName(ds.GetRasterBand(1).DataType))
                input_datatype_nums.append(ds.GetRasterBand(1).DataType)
                input_ndvs.append(ds.GetRasterBand(1).GetNoDataValue())
                if DimensionsCheck:
                    if DimensionsCheck != [ds.RasterXSize, ds.RasterYSize]:
                        raise Exception("Error! Dimensions of file %s (%i, %i) are different from other files (%i, %i).  Cannot proceed" %
                                            (input_file, ds.RasterXSize, ds.RasterYSize, DimensionsCheck[0], DimensionsCheck[1]))
                else:
                    DimensionsCheck = [ds.RasterXSize, ds.RasterYSize]

            if os.path.isfile(out_file):
                os.remove(out_file)
            # gdal_calc imputes the out type like this, but it isn't valid as e.g. two int datasets can result in a float
            # outType = gdal.GetDataTypeName(max(myDataTypeNum))

            #create the output file
            outDriver = gdal.GetDriverByName("GTiff")
            cOpts =  ["TILED=YES", "SPARSE_OK=TRUE", "BLOCKXSIZE=1024", "BLOCKYSIZE=1024", "BIGTIFF=YES", "COMPRESS=LZW", "NUM_THREADS=ALL_CPUS"]
            outDS = outDriver.Create(out_file, DimensionsCheck[0], DimensionsCheck[1], 1, gdal.GetDataTypeByName(out_type), cOpts)
            outDS.SetGeoTransform(input_datasets[0].GetGeoTransform())
            outDS.SetProjection(input_datasets[0].GetProjection())
            DefaultNDVLookup = {'Byte': 255, 'UInt16': 65535, 'Int16': -32767, 'UInt32': 4294967293, 'Int32': -2147483647, 'Float32': 3.402823466E+38, 'Float64': 1.7976931348623158E+308}
            outBand = outDS.GetRasterBand(1)
            outNDV = DefaultNDVLookup[out_type]
            outBand.SetNoDataValue(outNDV)
            outBand = None

            # vrt file reports a block size of 128*128 but the underlying hdf block size is 1200*100
            # so hard code this or some clean multiple : this minimises disk access
            myBlockSize = [4800,4800]
            nXValid = myBlockSize[0]
            nYValid = myBlockSize[1]
            nXBlocks = (int)((DimensionsCheck[0] + myBlockSize[0] - 1) / myBlockSize[0]);
            nYBlocks = (int)((DimensionsCheck[1] + myBlockSize[1] - 1) / myBlockSize[1]);

            for x in range(0, nXBlocks):
                if x == nXBlocks-1:
                    nXValid = DimensionsCheck[0] - x * myBlockSize[0]

                myX = x * myBlockSize[0]

                nYValid = myBlockSize[1]
                myBufSize = nXValid * nYValid

                for y in range(0, nYBlocks):
                    if y == nYBlocks-1:
                        nYValid = DimensionsCheck[1] - y * myBlockSize[1]
                        myBufSize = nXValid * nYValid

                    myY = y * myBlockSize[1]

                    ndv_locs = None #np.zeros(shape = (len(input_datasets), nYValid, nXValid))
                    _data3D = np.empty(shape = (len(input_datasets), nYValid, nXValid), 
                                        dtype = gdal.GetDataTypeName(max(input_datatype_nums)))

                    for i, calc_name in enumerate(names_in_calc):
                        _data3D[i] = input_datasets[i].GetRasterBand(1).ReadAsArray(
                            xoff=myX, yoff=myY, win_xsize=nXValid, win_ysize=nYValid)
                        # make a variable named whatever it was specified to be named in the input
                        # pointing to this band of data in the multiband array. So that the numexpr calc
                        # can point to a variable of the specified name.
                        exec("%s=_data3D[i]" %calc_name)
                        # build a 2d nodata array that's 1 where any of the bands we read are nodata
                        if input_ndvs[i] is not None:
                            if ndv_locs is None:
                                ndv_locs = np.zeros(shape=(nYValid, nXValid))
                            ndv_locs = 1 * np.logical_or(ndv_locs == 1, _data3D[i] == input_ndvs[i])

                    try:
                        result = ne.evaluate(calc)
                    except:
                        print("evaluation of calculation failed: "+calc)
                        raise
                    print(result.shape)
                    if ndv_locs is not None:
                        # apply ndv (set nodata cells to zero then add nodata value to these cells)
                        result = ((1 * (ndv_locs==0)) * result + (outNDV * ndv_locs))
                    print(result.shape)
                    outBand = outDS.GetRasterBand(1)
                    outBand.WriteArray(result, xoff=myX, yoff=myY)
            return out_file

# Demonstration interactive pipeline

Note that for actually running this on dataflow we will need to move the vrt, calculate, reproject, upload steps into a single PTransform. Otherwise they might (will) get run on separate workers with non-shared local storage and bad things

For now, with interactive running, here's a pipeline to put it all together.

First check the test cases are set:

In [32]:
DATES_TO_TEST, TILES_TO_TEST

(['2019.01.25', '2019.02.02', '2020.01.25', '2020.02.02'],
 ['h17v03', 'h18v03', 'h17v04', 'h18v04'])

In [23]:
extratile = TILES_TO_TEST.copy()
extratile.append('h18v05')
extratile

['h17v03', 'h18v03', 'h17v04', 'h18v04', 'h18v05']

Create a PCollection of the required images 


In [195]:
# for the itertools
tiles = [TILES_TO_TEST]

Define some classes to map the requested data products and dates into the output images we want to create. 

Each output image (say EVI for 2020.01.01 or LST_Day for 2019.02.02) will be comprised of one or more input singleband vrt images, defining the raw data layers that go into calculating it. A calculation is also provided to transform from the raw input data to the output. For example to go from integerised and scaled temperature representations in Kelvin as stored in MOD11A2 into the required output (float temperature in celsius). Or to go from the 0-32000 integer values in each of the 7 BRDF bands to a single calculated float value of TCB. Each output image will be one job in the pipeline, represented by a `CalculatedImageInfo` object.

In turn, each singleband vrt image will be comprised of a page to download HDFs from; (eventually) a list of HDF tile files to obtain to make the image (either 1 or 317 per image, depending on MODIS product); and a layer template to extract the required subdataset from each HDF (MOD11A2 HDFs contain several bands / subdatasets; others don't). Each vrt image will be represented by a `VrtImageInfo` object and will provide a list of HDF files for download.

We provide 2 CSV files as input. `bands.csv` contains the necessary info to obtain the downloads for each band (product url and layer template) along with a band identifier. `products.csv` contains the necessary info to construct a `CalculatedImageInfo` for a particular date and region; i.e. the IDs of the bands required and the calculation to extract the needed numbers. For each of these, we make a factory object which will make the image specification objects for the required day and region.

In [133]:
class SingleBandFactory:
    def __init__(self, line):
        self.id, self._product_url, self._layer_template = line.strip().split(',')
        
    def CreateVrtSpecificationForDayAndRegion(self, date, tiles):
        return VrtImageInfo(self._product_url + "/" + date, date, tiles, self._layer_template, self.id)


In [135]:
with open ("../../resources/bands.csv") as f:
    lines = (f.readlines())
BAND_FACTORIES = [SingleBandFactory(l) for l in lines[1:]]    
#v = BAND_FACTORIES[2].CreateVrtSpecification('2020.02.02', '*')

In [175]:
class OutputImageSpecFactory:
    def __init__(self, line):
        var_name, bands_unsplit, calc, min_clip, max_clip = line.split(',')
        bands = bands_unsplit.split(';')
        self.id = var_name
        self._calc = calc
        self._min_clip = min_clip
        self._max_clip = max_clip
        self.vrt_band_factories = [i for i in BAND_FACTORIES if i.id in bands]
        if len(bands) != len(self.vrt_band_factories): 
            # fail if a band was mentioned that wasn't in the bands.csv instructions
            assert False
        
    def CreateOutputImageSpecification(self, date, tiles):
        vrtObjs = [v.CreateVrtSpecificationForDayAndRegion(date, tiles) for v in self.vrt_band_factories]
        return CalculatedImageInfo(date, vrtObjs, self._calc, self._min_clip, self._max_clip, self.id)
                
#def __init__(self, date, input_vrt_objs, calc, min_clip, max_clip, out_var_name, out_type='Float32'):


In [176]:
with open ("../../resources/products.csv") as f:
    lines = (f.readlines())
IMAGE_SPEC_FACTORIES = [OutputImageSpecFactory(l) for l in lines[1:]]
#i  = IMAGE_SPEC_FACTORIES[0].CreateOutputImageSpecification('2020.02.02', '*')

Now we can make one output image specification for every requested product (from products.csv) and every date required, for the given region.
Zip them all together and make an image spec from each factory for each date.

In [199]:
jobs = [t[0].CreateOutputImageSpecification(t[1], t[2]) for t in (itertools.product(IMAGE_SPEC_FACTORIES, DATES_TO_TEST, tiles))]

In [200]:
p = beam.Pipeline(InteractiveRunner())

In [201]:
initialJobsPColl = p | beam.Create(jobs)

In [202]:
ib.show(initialJobsPColl)

Get all the HDF URLs matching the required dates and tiles (regardless of whether they've already been downloaded)

In [75]:
hdf_urls = initialJobsPColl | PopulateHdfUrls()
#ib.show(hdf_urls)

In [76]:
reshuffled_urls = hdf_urls | "Fusion break" >> beam.Reshuffle()

Get the paths to these tiles on the GS bucket - downloading them if they aren't already there. Hopefully the reshuffle will mean that dataflow will scale this up as there may be 317 times more urls to download than there are images

In [77]:
bucket_paths = reshuffled_urls | GetMissingHdfsToBucket(HDF_BUCKET_PATH, username, password)
#ib.show(bucket_paths)

In [78]:
class InflateImInfoWithGsPaths(beam.DoFn):
    def __init__(self):
        pass
    
    def process(self, tpl):
        iminfo = tpl[0]
        gsfiles = tpl[1]
        iminfo.HDF_GS_PATHS = gsfiles
        return [iminfo]

Collate back to one element per output image, collecting all the tile paths to a list then storing it as a field on the ImInfo element

In [79]:
im_info_with_tile_paths = (bucket_paths 
        | "Concatenate info to HDF" >> beam.GroupByKey() 
        | "Insert GSpaths" >> beam.ParDo(InflateImInfoWithGsPaths()))


In [80]:
# need to wait here
#vrts = beam.Pipeline(InteractiveRunner()) | beam.Create(DATES_TO_TEST) | CreateVrtsForDays(hdf_bucket_path)
vrts = im_info_with_tile_paths | BuildVrt()
ib.show(vrts)

In [119]:
uploaded_tiffs = vrts | TranslateVrtToLstTiff() | CreateProjectedOutput() | beam.ParDo(UploadAndClean())

In [120]:
ib.show(uploaded_tiffs)

### Converting it to run on dataflow

The download part is easier, gdal isn't needed

In [245]:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions, SetupOptions
from apache_beam.runners import DataflowRunner
import google.auth

In [246]:
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow will run.
options.view_as(GoogleCloudOptions).region = 'europe-west4'

# Because this notebook comes with a locally built version of the Beam Python SDK, we will need to set
# the sdk_location option for the Dataflow Runner. You will not need to do this if you are using an
# officially released version of Apache Beam.
options.view_as(pipeline_options.SetupOptions).sdk_location = (
    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % 
    beam.version.__version__)

In [247]:
dataflow_gcs_location = BUCKET_PATH + "/dataflow"
# Dataflow Staging Location.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Dataflow Temp Location.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

In [248]:
DF_HDF_BUCKET_PATH = BUCKET_PATH + '/hdf_df' #NB not os.path.join as that breaks on windows with backslash paths
DF_OUTPUT_BUCKET_PATH = BUCKET_PATH + "/output_df"

In [249]:
p  = beam.Pipeline(DataflowRunner(), options=options)

In [250]:
date_page_urls = (p | 'dates_to_process' >> beam.Create(DATES_TO_TEST)
                   | 'date_page_urls' >> beam.ParDo(GetDatePageUrl(product_url)))
hdf_urls = date_page_urls | GetHdfUrlsForDate()
#ib.show(hdf_urls)

required_for_download = hdf_urls | check_existing_files(DF_HDF_BUCKET_PATH, TILES_TO_TEST)

downloaded_files = required_for_download | DownloadHdfsToBucket(username, password, DF_HDF_BUCKET_PATH)

download_result = p.run()



In [251]:
from IPython.core.display import display, HTML
url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % 
      (download_result._job.location, download_result._job.id, download_result._job.projectId))
display(HTML('Click <a href="%s" target="_new">here</a> for the details of your Dataflow job!' % url))

Still need to fix some import locations (import everything within the function where it's used). Not going to take this further in the notebook.

### large functions below here to keep notebook a bit tidier

### Define a function to run a given calculation against a GDAL raster

This is based loosely on gdal_calc.py, it runs in blocks and saves to a sparse output raster. It uses numexpr to allow multithreaded computation of the actual calculation step and it also does multithreaded writing of the output. 

The calculation should be provided as a string with the input data being called 'band_data' e.g. for MODIS LST use "band_data * 0.02 + (-273.15)"
e.g. `run_singleband_calculation("/tmp/2020.02.02/LST_Day.2020.02.02.vrt", "/tmp/2020.02.02/test_celsius_output.tif", "band_data * 0.02 + (-273.15)")`

In [117]:

c2 = "where(arr < 20, 20, (where (arr > 80, 80, arr)))"

In [118]:
def run_singleband_calculation(input_singleband_file, out_file, calc, out_type='Float32'):
    '''Hacked together from gdal_calc.py. Uses numexpr for slightly more efficient calculation (multithreaded).
    
    Calc must be the calculation to apply to the data from input_singleband_file, specified as a string which will be 
    eval'd against the data which will exist in a variable called band_data. i.e. to specify doubling the data then subtracting 
    three, provide calc="(band_data * 2.0) - 3.0"'''
    #input_datasets = []
    #myBands = []
    #myDataType = []
    #myDataTypeNum = []
    #myNDV = []
    import numpy as np, numexpr as ne
    DimensionsCheck = None
    
    ds = gdal.Open(input_singleband_file, gdal.GA_ReadOnly)
    if not ds:
        raise IOError("Error opening input file {}".format(input_file))
    input_dataset = ds
    inputDataType = (gdal.GetDataTypeName(ds.GetRasterBand(1).DataType))
    inputDataTypeNum = (ds.GetRasterBand(1).DataType)
    inputNDV = (ds.GetRasterBand(1).GetNoDataValue())
    
    DimensionsCheck = [ds.RasterXSize, ds.RasterYSize]

    if os.path.isfile(out_file):
        os.remove(out_file)
    # gdal_calc does this but it isn't valid as two int datasets can result in a float!
    # outType = gdal.GetDataTypeName(max(myDataTypeNum))
    
    #create the output file
    outDriver = gdal.GetDriverByName("GTiff")
    cOpts =  ["TILED=YES", "SPARSE_OK=TRUE", "BLOCKXSIZE=1024", "BLOCKYSIZE=1024", "BIGTIFF=YES", "COMPRESS=LZW", "NUM_THREADS=ALL_CPUS"]
    outDS = outDriver.Create(out_file, DimensionsCheck[0], DimensionsCheck[1], 1, gdal.GetDataTypeByName(out_type), cOpts)
    outDS.SetGeoTransform(input_dataset.GetGeoTransform())
    outDS.SetProjection(input_dataset.GetProjection())
    DefaultNDVLookup = {'Byte': 255, 'UInt16': 65535, 'Int16': -32767, 'UInt32': 4294967293, 'Int32': -2147483647, 'Float32': 3.402823466E+38, 'Float64': 1.7976931348623158E+308}
    outBand = outDS.GetRasterBand(1)
    outNDV = DefaultNDVLookup[out_type]
    outBand.SetNoDataValue(outNDV)
    outBand = None
    
    # vrt file reports a block size of 128*128 but the underlying hdf block size is 1200*100
    # so hard code this or some clean multiple : this minimises disk access
    myBlockSize = [4800,4800]
    nXValid = myBlockSize[0]
    nYValid = myBlockSize[1]
    nXBlocks = (int)((DimensionsCheck[0] + myBlockSize[0] - 1) / myBlockSize[0]);
    nYBlocks = (int)((DimensionsCheck[1] + myBlockSize[1] - 1) / myBlockSize[1]);
    
    for x in range(0, nXBlocks):
        if x == nXBlocks-1:
            nXValid = DimensionsCheck[0] - x * myBlockSize[0]
        
        myX = x * myBlockSize[0]
        
        nYValid = myBlockSize[1]
        myBufSize = nXValid * nYValid
        
        for y in range(0, nYBlocks):
            if y == nYBlocks-1:
                nYValid = DimensionsCheck[1] - y * myBlockSize[1]
                myBufSize = nXValid * nYValid
                
            myY = y * myBlockSize[1]
            band_data = input_dataset.GetRasterBand(1).ReadAsArray(xoff=myX, yoff=myY, 
                                                                       win_xsize=nXValid, win_ysize=nYValid)
            nodata_locs = band_data == inputNDV
            
            try:
                result = ne.evaluate(calc)
            except:
                raise
            
            # apply ndv (set nodata cells to zero then add nodata value to these cells)
            result = ((1 * (nodata_locs==0))*result + (outNDV * nodata_locs))
            
            outBand = outDS.GetRasterBand(1)
            outBand.WriteArray(result, xoff=myX, yoff=myY)
    return out_file

# Example of calculation functions:

Both produce a single band output. All input files are single band (or if not then only the first band will be used). The names refer to how many single band input files can be provided. If you have multiband rasters to work on, then first make singleband files from them i.e. .vrt files that select only the relevant band.

Single band: 

`run_singleband_calculation("/tmp/2020.02.02/LST_Night.2020.02.02.vrt", "/tmp/2020.02.02/test_celsius_night.tif", "band_data * 0.02 + (-273.15)")`

Multi band:

`
run_multiband_calculation(["/tmp/2020.02.02/LST_Day.2020.02.02.vrt", "/tmp/2020.02.02/LST_Night.2020.02.02.vrt"], 
                          ["day", "night"], 
                          "/tmp/2020.02.02/test_celsius_diurnaldiff.tif", 
                          "(day * 0.02 + (-273.15)) - (night * 0.02 + (-273.15))")
                          `
                          
single band function is strictly redundant but just a bit easier to use