# Temporal aggregation

This is a demonstration of how to run temporal aggregations using the cython library code. (Or other reduction-type aggregations of multiple files into one - not specifically temporal).

The code is written in cython (`raster_utilities/aggregation/temporal/core/temporal.pyx`) and a helper class `raster_utilities/aggregation/temporal/temporal_aggregation_runner.py` is provided to assist with loading the data and passing it to the core function.

This notebook demonstrates how to use the helper class, by building the input arguments that it needs and then calling its main run method.

Import the aggregation helper class:

In [None]:
from raster_utilities.aggregation.temporal.TemporalAggregator import TemporalAggregator


In [None]:
import os
from collections import defaultdict
import glob

## 1. Define the aggregations

The "temporal" aggregation is controlled by a dictionary in which the keys represent the required output aggregation points (years, calendar months, etc, or just a single key for a quick summary of "everything") and the values are a list of files corresponding to that period. For each dictionary item, the files in the list will be summarised / flattened into a single output file, for each requested statistic type. 

A given input file can appear in more than one output aggregation (e.g. you can have keys for real months and for synoptic months) - each key is processed separately, except for if synoptic output is requested in which case all files mentioned in any of the dictionary items will contribute (but only once).

Here we show a couple of ways to build that object

#### Example 1: MODIS 8-daily images to dynamic monthly, dynamic annual, and synoptic monthly outputs:

This cell defines a function to build the dictionary from a list of MODIS 8-daily filenames, based on extracting the date from the filenames, and will create a dictionary to output up to all of dynamic monthly, dynamic annual, and synoptic monthly, and synoptic overall in one pass. 

Note: synoptic overall we can alternatively ask the aggregator to do on-the-fly, as opposed to adding a key to the dictionary for it (containing all files: doSynopticOverall=True). No need to do both! The aggregator can only do it on the fly if it "sees" all the files; we can't do it that way with parallel processing.

This could be changed to suit the filename patterns being used and the type of outputs we want (annual, monthly, synoptic months?) 

We can use a defaultdict rather than a real dict which simplifies the loop a bit.

The string keys of the dictionary will be used to create the output filenames; we pass in a string "tag" which will be used in conjunction with the date to generate these keys. You might want to alter the strings slightly to make them more informative. 

There is scope to be more efficient here by sorting the dynamic monthly requests and then making a new aggregator for each one, using its synoptic-overall functionality to effectively generate the annual ones for free at the same time as the monthly ones. This would need modification to TemporalAggregator or bespoke handling of the filenames here.

This version of the function parses filenames that are in the new 6-token filename format

In [None]:
def buildMODISKeyFromMGDailies(fileList, doMonthly=True, doAnnual=True, doSynopticMonthly=True, doSynopticOverall=False):
    # mapping of julian day to number of the calendar month
    daymonths = {1:1, 9:1, 17:1, 25:1, 33:2, 41:2, 49:2, 57:2, 65:3, 73:3, 81:3, 89:3, 97:4, 
             105:4, 113:4, 121:5, 129:5, 137:5, 145:5, 153:6, 161:6, 169:6, 177:6, 185:7, 
             193:7, 201:7, 209:7, 217:8, 225:8, 233:8, 241:8, 249:9, 257:9, 265:9, 273:9, 
             281:10, 289:10, 297:10, 305:11, 313:11, 321:11, 329:11, 337:12, 345:12, 353:12, 
             361:12}
             
    processingKey = defaultdict(list)
    fnTemplate = "{}.{}.{}.{}.{}.{}"
    statPlaceholder = "*"
    if tag is not None:
        if not tag.endswith("."):
            tag = tag + "."
    for fn in fileList:
        parts = os.path.basename(fn).split('.')
        assert len(parts)==7
        var, yr, daynum, _, res, spatialstat, tif = parts
        assert len(daynum) == 3
        monthStr = str(daymonths[int(daynum)]).zfill(2)
        if doMonthly:
            outKey = fnTemplate.format(var,yr,monthStr,statPlaceholder,res,spatialstat)
            processingKey[outKey].append(fn)
        if doAnnual:
            outKey = fnTemplate.format(var,yr,"Annual",statPlaceholder,res,spatialstat)
            processingKey[outKey].append(fn)
        if doSynopticMonthly:
            outKey = fnTemplate.format(var,"Synoptic",monthStr,statPlaceholder,res,spatialstat)
            outKeyOverall = fnTemplate.format(var,"Synoptic","Overall",statPlaceholder,res,spatialstat)
            processingKey[outKey].append(fn)
            if doSynopticOverall:
                processingKey[outKeyOverall].append(fn)
    return processingKey

This version does the same but parses filenames that are in the original format for the MODIS 8-daily grids (e.g. A2000049_xxx.tif)


In [None]:
def buildMODISKeyFromDailies(tag, fileList, doMonthly=True, doAnnual=True, doSynopticMonthly=True, doSynopticOverall=False):
    #daymonths = {1:1, 9:1, 17:1, 25:1, 33:2, 41:2, 49:2, 57:2, 65:3, 73:3, 81:3, 89:3, 97:4, 
    #         105:4, 113:4, 121:4, 129:5, 137:5, 145:5, 153:6, 161:6, 169:6, 177:6, 185:7, 
    #         193:7, 201:7, 209:7, 217:8, 225:8, 233:8, 241:8, 249:9, 257:9, 265:9, 273:9, 
    #         281:10, 289:10, 297:10, 305:10, 313:11, 321:11, 329:11, 337:12, 345:12, 353:12, 
    #         361:12}
    # mapping of julian day to number of the calendar month
    daymonths = {1:1, 9:1, 17:1, 25:1, 33:2, 41:2, 49:2, 57:2, 65:3, 73:3, 81:3, 89:3, 97:4, 105:4, 113:4, 121:5, 
             129:5, 137:5, 145:5, 153:6, 161:6, 169:6, 177:6, 185:7, 193:7, 201:7, 209:7, 217:8, 225:8, 233:8, 
             241:8, 249:9, 257:9, 265:9, 273:9, 281:10, 289:10, 297:10, 305:11, 313:11, 321:11, 329:11, 337:12, 
             345:12, 353:12, 361:12}
             
    processingKey = defaultdict(list)
    fnTemplate = "{}.{}.{}.{}.{}.{}"
    statPlaceholder = "*"
    res = '1km'
    spatialstat = 'Data'
    for fn in fileList:
        parts = os.path.basename(fn).split('_')
        dateStr = parts[0]
        yr = dateStr[1:5]
        daynum = int(dateStr[5:8])
        monthStr = str(daymonths[daynum]).zfill(2)
        if doMonthly:
            outKey = fnTemplate.format(tag,yr,monthStr,statPlaceholder,res,spatialstat)
            processingKey[outKey].append(fn)
        if doAnnual:
            outKey = fnTemplate.format(tag,yr,"Annual",statPlaceholder,res,spatialstat)
            processingKey[outKey].append(fn)
        if doSynopticMonthly:
            outKey = fnTemplate.format(tag,"Synoptic",monthStr,statPlaceholder,res,spatialstat)
            outKeyOverall = fnTemplate.format(tag,"Synoptic","Overall",statPlaceholder,res,spatialstat)
            processingKey[outKey].append(fn)
            if doSynopticOverall:
                processingKey[outKeyOverall].append(fn)
    return processingKey



In [None]:
# Then: 
# build a dictionary keyed by year, to create annual outputs
inFilePattern1 = r'C:\temp\Gapfilling\LST_Day_mosaic\1km\8-daily\*Data.tif'
inFiles = glob.glob(inFilePattern1)
tag = "LST_Day_v6"
fileKey = buildMODISKeyFromDailies(tag, inFiles, doMonthly=True, doAnnual=True, doSynopticMonthly=False, doSynopticOverall=False)


In [None]:
# or
inFilePattern1 = r'C:\Temp\dataprep\Haiti\TCW_Out\TCW_v6_HTI.*.Data.tif'
inFiles = glob.glob(inFilePattern1)
tag = "TCW_Filled_v6_HTI"
fileKey = buildMODISKeyFromMGDailies(tag, inFiles, doMonthly=True, doAnnual=True, doSynoptic=False, doOverall=False)


In [None]:
fileKey.keys()

#### Example 2: balanced means
This cell would build a dictionary with a single key, to create a "balanced" mean from pre-existing synoptic monthly mean files (created using the cell above and subsequently renamed to the 6-token syntax). We don't pass in a tag, we extract the existing one instead.

In [None]:
def buildSynopticBalancedMeanKey(fileList):
    tag = None
    stat = None
    files = []
    for fn in fileList:
        parts = os.path.basename(fn).split(".")
        thistag = parts[0]
        synoptictag = parts[1]
        monthtag = parts[2]
        stattag = parts[3]
        if tag is None:
            tag = thistag
        if tag != thistag:
            assert False
        if synoptictag != "Synoptic":
            assert False
        try:
            i = int(monthtag)
        except:
            continue # the ".Overall" one
        if stat is None:
            stat = stattag
        if stat != stattag:
            assert False
        files.append(fn)
    outname = tag + "." + "Synoptic.Overall.Balanced-" + stat
    return {outname: files}

inFilePattern = r'C:\Temp\dataprep\EVI\EVI_Unfilled_Synoptic\EVI*.Synoptic.*.mean.*.tif'
inFiles = glob.glob(inFilePattern)
inFiles = [f for f in inFiles if len(f.split('.')[2])==2]
fileKey = buildSynopticBalancedMeanKey(inFiles)

#### Example 3: CHIRPS monthlies to dynamic annual outputs:

In [None]:
def buildBasicKey(fileList):
    processingKey = defaultdict(list)
    for fn in fileList:
        parts = os.path.basename(fn).split('.')
        yr = parts[1]
        outkey = "CHIRPS."+yr
        processingKey[outkey].append(fn)
    return processingKey

#### Example 4: 
just some kind of one-off thing, make a single output by definining the files against a one key dictionary

In [None]:
files = glob.glob(r'J:\Temp_Suitability\5k\Pf\monthly_pf\*.2002.*.tif')
fileKey = {"test-2002": files}
fileKey

In [None]:
def monthlyToSynopticAndAnnual(fileList, doSynopticMonthly=True, doAnnual=True, doSynopticOverall=True):
    processingKey = defaultdict(list)
    for fn in fileList:
        parts = os.path.basename(fn).split('.')
        assert len(parts)==7
        var,yr,mth,timeAgg,res,resAgg,tif = parts
        assert len(mth)==2
        assert len(yr)==4
        outKey = ".".join([var, "Synoptic", mth])
        outKeyOverall = ".".join([var, "Synoptic", "Overall"])
        outKeyAnnual = ".".join([var, yr, "Annual"])
        if doSynopticMonthly:
            processingKey[outKey].append(fn)
        if doSynopticOverall:
            processingKey[outKeyOverall].append(fn)
        if doAnnual:
            processingKey[outKeyAnnual].append(fn)
    return processingKey
            
            

In [None]:
files = glob.glob(r'\\map-fs1.ndph.ox.ac.uk\map_data\temp\MODIS_Global\MOD11A2_v6_LST\Air_temp_min\1km\Monthly\*.tif')
fileKey = monthlyToSynopticAndAnnual(files, doSynopticMonthly=True, doAnnual=False, doSynopticOverall=False)

## 2. Other setup

We also need to specify the output folder, the output nodata value, and whether we want to create a synoptic (overall) output too (this doubles memory use so don't do unless you need it).

In [None]:
#outDir = r"G:\modis\mcd43b4_v5\TCW_Synoptic_From_5KDaily"
outDir = r"C:\temp\Gapfilling\LST_Day_mosaic\1km\Monthly"
outNDV = -9999
doSynoptic = False
#outDir = r'J:\Temp_Suitability\5k'

Finally we need to specify which stats to do, what's appropriate will depend on the data. For rainfall we just want a sum.
The values must be specified as a list of values from the TemporalAggregationStats class. You can also use TemporalAggregationStats.ALL

In [None]:
from raster_utilities.aggregation.aggregation_values import TemporalAggregationStats

In [None]:
# use the string value of the enums e.g.
#stats = [ 'mean']#, 'count', 'SD', 'max', 'min']
stats = ['mean', 'min', 'max']
# or enum objects e.g.
# stats = [TemporalAggregationStats.MEAN, TemporalAggregationStats.RANGE]
#stats = [TemporalAggregationStats.MEAN, TemporalAggregationStats.MAX, TemporalAggregationStats.MIN, TemporalAggregationStats.SD]

## 3. Running 

Now we just need to instantiate the class and run the aggregation. 
The runner should automatically handle splitting the processing into tiles if the files are too large to fit into memory, although currently it estimates this based on assuming it can use ~40GB RAM so you might need to tweak it directly. Intermediate processing tiles are not automatically deleted at present.

doSynoptic here controls whether the aggregator should produce "grand totals". If we're aggregating a whole cube, then we can do it like this, or we can simply add an entry to fileKey that contains all the files (using the doSynopticOverall option of the file key helper functions). The aggregator can only do it as grand totals for the files that it sees, so it only works this way if we are not using multiprocessing. (Note - the aggregator tracks filenames so it doesn't matter if the same file is passed to it multiple times e.g. in doing dynamic monthly and dynamic annual outputs.)

In [None]:
agg = TemporalAggregator(fileKey, outDir, outNDV, stats, doSynoptic, bytesLimit=8e9)

In [None]:
agg.RunAggregation()

### Stitching the tiles

The aggregator should do this automatically but if it fails for some reason you can do something like this to stitch them:

In [None]:
import subprocess as subp


In [None]:
for stat in agg.stats:
    for timeKey in agg._timePoints():
        wildcard = "{}.{}.*.tif".format(timeKey,stat.value)
        tiles = glob.glob(os.path.join(agg._tileFolder, wildcard))
        tilesStr = " ".join(tiles)
        tifName = wildcard.replace("*.", "1km.Data.")
        vrtName = tifName.replace(".tif",".vrt")
        vrtFile = os.path.join(outDir,vrtName)
        tifFile = os.path.join(outDir,tifName)
        #print (vrtName)
        vrtCommand = "gdalbuildvrt {} {}".format(os.path.join(outDir,vrtName), tilesStr)
        print(vrtCommand)
        !{vrtCommand}
        transCommand = "gdal_translate -of GTiff -co COMPRESS=LZW "+\
            "-co PREDICTOR=2 -co TILED=YES -co BIGTIFF=YES " +\
            "-co NUM_THREADS=ALL_CPUS --config GDAL_CACHEMAX 8000 {} {}".format(
            vrtFile, tifFile)
        ovCommand = "gdaladdo -ro --config COMPRESS_OVERVIEW LZW --config USE_RRD NO "+\
            "--config TILED YES --config GDAL_CACHEMAX 8000 {} 2 4 8 16 32 64 128 256".format(
            tifFile)
        statCommand = "gdalinfo -stats {}>nul".format(tifFile)
        print(transCommand)
        #subp.check_call(str(cline), shell=True)
        subp.check_call(transCommand, shell=True)
        print(ovCommand)
        subp.check_call(ovCommand, shell=True)
        print(statCommand)
        subp.check_call(statCommand, shell=True)
        

## 4. Parallel processing
We can do multiple items from the fileKey (i.e. multiple output aggregations) in parallel with the following 
caveats:
- each one still takes a lot of memory and so we can probably do fewer processes than we could with the spatial aggregator, and it doesn't make sense on a desktop machine
- in fact the code here won't even work on ipython/windows, the caller function would need to be in a separate .py file - see the spatial aggregator notebook for an example
- we can't do an on-the-fly overall synoptic, as each aggregator only gets a subset of files, we have to do a single overall one which will probably take 50% of the time of all the others (monthly + annual) put together
- the processing is already multithreaded so we're only speeding up the i/o parts

In [None]:

from multiprocessing import Pool
fileKey.items()
def callAgg(kvp):
    try:
        oneKeyDict = {kvp[0]:kvp[1]}
        agg = TemporalAggregator(oneKeyDict, outDir, outNDV, stats, False, bytesLimit=50e9)
        agg.RunAggregation()
    except KeyboardInterrupt, e:
        pass
        
def runMulti():
    pool = Pool(8)
    p = pool.map_async(callAgg, fileKey.items())
    try:
        r = p.get(0xFFFF)
    except KeyboardInterrupt:
        print ("oops")
        return


In [None]:
runMulti()