# Content

This IPython notebook contains a Python / Cython implementation of code used to gap-fill images derived from the MODIS data catalogue, specifically LST Day / Night, EVI, TCB, and TCW.

The original algorithms used were developed by Dan Weiss and are described in the following paper:

Weiss, D.J., Atkinson, P.M., Bhatt, S., Mappin, B., Hay, S.I. & Gething, P.W. (2014) An effective approach for gap-filling continental scale remotely sensed time-series. ISPRS Journal of Photogrammetry and Remote Sensing, 98, 106-118

[doi:10.1016/j.isprsjprs.2014.10.001](http://dx.doi.org/10.1016/j.isprsjprs.2014.10.001)

The code herein is a reimplementation of the Weiss code (which was written in IDL) by Harry Gibson, also at the [Malaria Atlas Project](http://www.map.ox.ac.uk/), Oxford. 

The core algorithms are unchanged from the published paper but substantial improvements in efficiency have been made enabling the (more effective but more computationally demanding) A1 algorithm to be used for larger (thus, more) gaps, whilst still allowing gapfilling of 8-daily global 1km MODIS data stacks to be undertaken in a reasonable period. Running with 20 cores on a Blade server, total runtime on a 15-year 8-daily 1km global stack was in the region of 5-6 days.

# Usage

All the code including the algorithms and calls necessary to run them on a series of images is included in this notebook. This means that the Cython code can be compiled and run very easily, although it does mean that the notebook is rather long! 

Cells containing code to configure the run come first. These should be adjusted to suit your data and needs, and executed. 

Next comes a cell showing how the algorithms should actually be called. Before running this cell, you should first run all the cells containing the actual model code and utility functions - these come below the header "Operational Code". 

# License

<a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Creative Commons Licence" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a><br />This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.

# Implementation notes

## Memory

Although the algorithms have been made more efficient, a high-specification machine with plenty of RAM is still required to fill global images. 

We can run despeckle and A1 in slices that fit into memory, given the need for A1 to have a stack of (currently) 15 calendar-day images, plus the same of outputs and distances and flags. 

We can run a slice of approximately 10k pixels width * full height on a 64Gb machine - but A2 needs more than this as it cannot be run by slices and in the current implementation needs all 8 passes to be kept in order to calculate median values at each point (using means would get around this). 

Writing the intermediates to disk is a substantial overhead when there is not much other free memory available, but unavoidable. 

Also the OS does not free the memory until the disk has actually written it, if disk write caching is enabled. This means it can be beneficial to have a slightly compressed format for the intermediates, else A2 will not have as much memory as it should have. 15 days * 43200 * 21600 pixels * 4 bytes per pixel * 2.25 (out,dists,flags) = 117Gb!

**Bottom line - for global 1k fills a machine with at least 100Gb RAM would be recommended!**

We actually ran on MAP's blade servers (64 cores, 500Gb RAM). It could therefore run without saving intermediate data to disk. For A1 this would require memory for the full 15 year stack plus 8 A2 directional images, and mean and std; this would be in the region of 100Gb.

In fact I ran with memory use set to ~70Gb which meant that the A1 processed by slices with intermediate files; this did not prove significantly slower (presumably the OS cached those intermediate files) but kept the machine more usable for others.


## Differences from published algorithms

Despite having been parallelised and substantially optimised, the despeckle and fill algorithms are fundamentally mostly unchanged from those developed by Dan Weiss and described in Weiss et al. Specific changes that were made are noted below.

### Fill value calculation approach

In the gapfilling, we originally (in the published paper) calculated ratios between values occurring at different times.

However this method only makes sense if the scale is ratiometric. 
For temperature in celsius that's not the case: 10 degrees is not meaningfully twice
as hot as five degrees. Five degrees is not infinitely warmer than zero degrees.
Minus five degrees is not minus one times as warm as five degrees. 

For temperature we can use an absolute scale of kelvin: the higher numbers means that 
the ratio is going to be closer to 1 but it's multiplied by a bigger alternate value
too so it is justifiable.

EVI is an absolute (ratio) scale, but we need to handle the case when
either point is zero or very close to it, so we need to define a maximum plausible ratio
so if a point is 0.01 one year then 0.9 another year we don't bring in a huge multiplier of the 
alternate value point.

For tasseled cap brightness the coefficients are all positive, so treat as EVI. 

For tc wetness, the values the values can be positive or negative. But I think we 
need to transform these to make them all positive as it's not really an absolute 
scale; the numbers don't really mean anything; so TCW needs to be handled like LST 
but specify a max ratio too.

The alternative is to examine the _difference_ between values rather than the _ratio_ in the algorithms. This is safer, and the only real assumption is that the scale is linear (or it could be linearised). 

The code in this notebook offers both options; the difference method was used for all the MODIS global covariates.

### A2 pass averaging

In the published paper, A2 ran 8 directional passes and selected the median value of the results at each location. In this implementation A2 can use either the mean or the median value at each location. Whilst the mean more sensitive to outliers (which is why the median was originally used) this was not observed to be a problem in global tests and the mean was less prone to visual banding. Additionally as a mean can be computed on-the-fly, A2 could be made more memory efficient as the 8 passes do not all need to be stored in memory.

### Search ratios and thresholds

In generating our global data series we ran A1 with a larger search radius than was used in the published paper. This was possible due to the faster implementation. This means that more gap pixels were filled with A1 than previously.

### Mean / standard deviation reference data

In the original algorithm, despeckle and A2 fills are run by reference to a single all-time "synoptic" image for mean and standard deviation values. This was also the case here for most data, but the code has been adapted to allow a "stack" of mean and standard deviation images, such as one for each calendar month. This was used in despeckling EVI and other BRDF-derived imagery.

## Other

I have occasionally observed what seems to be an obscure linux kernel bug that can lead to the "migration" process taking almost all CPU time and nothing actually getting done during the multithreaded portions of the gapfill code. I have worked around this by setting CPU affinity for the threads I'm using. 

But, this needs to be done separately for each IPython kernel else, if running two gapfill sessions, they'll compete for the same cores.

So assuming a 64-core machine, for one session use say cores 0 - 20 and then a different 20 for another session, leaving the OS free to schedule other people's work on the remaining cores.

In [None]:
# os.environ['GOMP_CPU_AFFINITY'] = '1-25'

# or get rid of that if not needed
# os.unsetenv('GOMP_CPU_AFFINITY')
# os.environ.pop('GOMP_CPU_AFFINITY')

# Begin Code

## Imports / required modules

In [None]:
import gdal
import numpy as np
import os
import itertools
import time
import glob
import rasterio
import scipy.ndimage
import shutil
import bottleneck as bn
import gc
import numexpr as ne
import math

In [None]:
%load_ext cython

## Configuration of flags and thresholds

In [None]:
# Should the fill be based on the difference between neighbour values or the ratio between them?
# Ratio only works for ratiometric variables such as temperature in Kelvin
_FILL_BY_RATIO = 0


If we are running by ratio then set appropriate values if necessary to put the values into a "more absolute" scale e.g. to turn celsius into kelvin. Also set a max ratio in case there are values close to zero (like with EVI).

In [None]:
# LST
# value that could be subtracted from the data to put them into a close-enough ratio scale
_DATA_ZERO_OFFSET = -273.15 # just put the values into kelvin which is a ratio scale
_MAX_ALLOWABLE_RATIO = 2.0 # will never be seen with LST in kelvin

# EVI
#_DATA_ZERO_OFFSET = 0.0 # EVI is an absolute scale anyway
#_MAX_ALLOWABLE_RATIO = 5.0 # but values close to 0 will give large ratio, so define a limit

# TCB 
# TCB is an arbitrary unit but is always positive, so treat as an absolute scale
#_DATA_ZERO_OFFSET = 0.0 # EVI is an absolute scale anyway
#_MAX_ALLOWABLE_RATIO = 10.0 # be a bit more lax as i guess it varies more

# TCW 
# TCW is also an arbitrary unit but can be negative or positive so calculate ratios on shifted values
#_DATA_ZERO_OFFSET = 3.0 # TCW 
#_MAX_ALLOWABLE_RATIO = 5.0


#### Other data "fixes"?

In [None]:
# if the data were converted wrong from HDF then fix that here: i mistakenly made the celsius grids by subtracting 273.0 from the 
# original kelvin data so need to remove another 0.15
#_DATA_CORRECTION_OFFSET = -0.15
_DATA_CORRECTION_OFFSET = 0.0

#### Fill algorithm thresholds and distances

In [None]:
_TRIM_MIN_MAX = 1
# Should the fill values be clipped?
_CLIP_TO_LIMITS = 1


In [None]:
# Hard upper / lower limits that the fill values should be clipped to
# 1-0 for EVI, 100 - -100 for LST
#_DATA_UPPER_LIMIT = 1.0
#_DATA_LOWER_LIMIT = 0.0
#_DATA_UPPER_LIMIT = 2.0
#_DATA_LOWER_LIMIT = -1.0

_DATA_UPPER_LIMIT = 100.0
_DATA_LOWER_LIMIT = -100.0

#### Despeckle thresholds

In [None]:
# Number of stds beyond the mean beyond which a fill value will be clipped
_FLOOR_CEILING_VALUE = 2.58
# Number of stds beyond the mean beyond which a value will be unconditionally discarded in despeckle
_EXTREME_VALUE_THRESHOLD = 2.58
# Number of stds beyond the mean beyond which a value may be discarded in despeckly, depending on neighborhood similarity
_SPECKLE_THRESHOLD = 1.64 #1.96
# Max difference in Z-score between a potential speckle and the average of its neighbours, for it 
# to be accepted as not being a speckle
_SPECKLE_NBR_Z_THRESH = 0.2

#### Spiral search distance / density (-> search radius)

In [None]:
# How many cells should the spiral search in despeckle check up to (unless 
# it finds enough nbrs beforehand)
# effective search radius is therefore sqrt(value / pi)
_DESPECKLE_SEARCH_NBRS = 3142 #31420
# How many cells must be found within the above figure to succeed?
_DESPECKLE_MIN_USED_NBRS = 320 #1600
# How many cells will we go up to before we stop searching, if we find them
# before the end of the spiral search is reached?
_DESPECKLE_MAX_USED_NBRS = 640 #3200

# How many cells should the spiral search in A1 check up to (unless it finds 
# enough nbrs beforehand)
# A1 search radius is therefore approx sqrt (value / pi)
_A1_SEARCH_NBRS = 3142
# At least how many cells must be found within above num nbrs to succeed?
# Bear in mind that the spiral will run (n years - 1) times and this value
# applies to the entire search in total
_A1_MIN_USED_NBRS = 480# 320#40
# Stop the spiral search when we have found how many nbrs?
_A1_MAX_USED_NBRS = 960#640#80

#### A1 memory use (-> should we process by tile?)

In [None]:
# How much memory can A1 use? If necessary A1 will be run in slices so it can fit the stack 
# within the available memory. 
# A2 can't do this and will take what it needs, which is around 70Gb for global 1k data. 
# So not much point setting this to less than that.
# 190e9 will ensure A1 processes entirely in memory 
#totalAvailMem = 190e9
totalAvailMem = 40e9

In [None]:
# Method for saving A1 intermediate data if necessary (can be TABLES or TIFF, use TIFF)
_SAVEMETHOD = "TIFF"

#### A2 configuration

In [None]:
# Should we run A2, or only A1 (leaving gaps where A1 fails)
_RUN_A2 = 1

# Which of the 8 passes should A2 choose? the mean or median of the non-nan results?
_A2_PASS_SELECTOR = "MEAN" # or MEDIAN

#### Flag values

In [None]:
# Flag values to use. They go into an 8 bit bitmask value and are set/unset with 
# bitwise operators so don't change these without thought!!
_OCEAN_FLAG = 1
_FILL_FAILED_FLAG = 2 # indicates that whatever fill algorithm ran last failed
_EXTREME_FLAG = 4
_SPECKLE_FLAG = 8
_A1_FILLED_FLAG = 16
_A1_FILL_WAS_FULL_FLAG = 32
_A2_SUCCESS_FLAG = 64
_MINMAX_CLIP_FLAG = 128


## AOI for gapfill and output

In [None]:
def CalculateAOI(WestLimit, EastLimit, NorthLimit, SouthLimit):
    ''' Gets the pixel coordinates on a global 30 arcsecond image for the given bbox in degrees'''
    resolution = 0.008333333333333 # 12 threes
    assert EastLimit > WestLimit
    assert NorthLimit > SouthLimit
    
    x0 = int((WestLimit - -180) / resolution)
    x1 = int(((EastLimit - -180) / resolution)+0.5)
    y0 = int((90 - NorthLimit) / resolution)
    y1 = int(((90 - SouthLimit) / resolution) + 0.5)
    
    return ((x0,x1),(y0,y1))

In [None]:
# Do not adjust these : they are the size of the WGS84 global 1km tiffs
sourceDataHeight = 21600
sourceDataWidth = 43200

We assume that the input files are all 1km global. We may not want to gapfill the whole lot. Configure that here: use CalculateAOI() to get the necessary values based on lat / long boxes e.g.

In [None]:
CalculateAOI(-2,40,15,-10)

In [None]:
# adjust these as shown below if a non-global AOI is required
xLims = (0,43200)
yLims = (0,21600)

# Mountainous EVI shadow speckles area
#xLims = (29160, 31560)
#yLims = (5400, 7200)

# Eastern China dodgy LST test area
#xLims = (33600,36600)
#yLims = (6000,9000)

#Africa (Same as original algorithm cubes):
#xLims = (18467, 18467+12532)
#yLims = (6000, 6000+9600)

#African cloudy Random Test Area, 2-12W, 12-2N
#xLims = (21840,23040)
#yLims = (9660,10560)

# All but Antarctica:
#xLims = (0,43200)
#yLims = (0, 18000)

# E8 nations
#xLims = (22800, 26640)
#yLims = (11280, 15120)

# Haiti
# xLims = (12600, 13080)
# yLims = (8280, 8760)

# Testing
xLims = (21360, 26400)
yLims = (9000, 12000)

## Data sources configuration

#### Configure the wildcard path to the input data files, and the synoptic OR stacked mean and std files

In [None]:
# strPattern gives the wildcarded path to the input data files, where {} can be replaced 
# with a calendar day number to list all tif files of that calendar day e.g. A2014009_Day.tif
# means_Synoptic_Path should be the "balanced mean" file i.e. the mean of the monthly means
# std_Synoptic_Path should be the daily sd file, i.e. the s.d. of the individual days

# *** EVI ***
#strPattern = r'E:\MCD43B4\MCD43B4_Indices\EVI\*{}_*.tif'
#means_Synoptic_Path = r'G:\NewStats\EVI_Mean_From_Monthly.tif'
#std_Synoptic_Path = r'G:\NewStats\EVI_SD_From_Daily.tif'

# *** LST DAY ***
#strPattern = r'F:\MOD11A2\MOD11A2_Data\Day\*{}_*.tif'
#means_Synoptic_Path = r'G:\NewStats\LST_Day_Mean_From_Monthly.tif'
#std_Synoptic_Path = r'G:\NewStats\LST_Day_SD_From_Daily.tif'

# *** LST NIGHT ***
#strPattern = r'F:\MOD11A2\MOD11A2_Data\Night\*{}_*.tif'
#means_Synoptic_Path = r'G:\NewStats\LST_Night_Mean_From_Monthly.tif'
#std_Synoptic_Path = r'G:\NewStats\LST_Night_SD_From_Daily.tif'

#means_Synoptic_Path = '/home/ZOO/zool1301/Gapfilling/Stats/LST_Day_Month_1_Mean.tif'
#std_Synoptic_Path = '/home/ZOO/zool1301/Gapfilling/Stats/LST_Day_Month_1_SD.tif'

# *** TCW ***
#strPattern = '/home/ZOO/zool1301/Gapfilling/TCW/Input/*{}_*.tif'
#means_Synoptic_Path = '/home/ZOO/zool1301/Gapfilling/Stats/TCW_Mean_From_Monthly.tif'
#std_Synoptic_Path = '/home/ZOO/zool1301/Gapfilling/Stats/TCW_SD_From_Daily.tif'

# *** TCB ***
# strPattern = r'L:\TCW_v6_Unfilled\1km\8-Daily\*{}_TCW.tif'
# means_Synoptic_Path = r'L:\TCW_v6_Unfilled\1km\Synoptic\TCW_Unfilled_V6.Synoptic.Overall.Balanced-mean.1km.Data.tif'
# std_Synoptic_Path = r'L:\TCW_v6_Unfilled\1km\Synoptic\TCW_Unfilled_V6.Synoptic.Overall.SD.1km.Data.tif'

# *** TESTING ***
strPattern = r'G:\Shared drives\MODIS-unfilled\LST_Day_v6\1km\8-Daily\*{}_LST_Day*.tif'
means_Synoptic_Path = r'G:\Shared drives\MODIS-unfilled\LST_Day_v6\1km\Synoptic\LST_Day_v6_Unfilled.Synoptic.Overall.Balanced-mean.mean.1km.tif'
std_Synoptic_Path = r'G:\Shared drives\MODIS-unfilled\LST_Day_v6\1km\Synoptic\LST_Day_v6_Unfilled.Synoptic.Overall.SD.1km.Data.tif'

means_Stack_Path = None
std_Stack_Path = None
stdsPreloaded=None

#### Configure the path to the land/sea mask dataset

In [None]:

coast_Path = r'G:\Shared drives\MAP MODIS Covariates\MODIS_Global\CoastGlobal.tiff'

#### Configure the output and temp directories

In [None]:
# Temp dir should be on C or whatever disk is fastest and have ~200Gb or more free space
#tempDir = os.path.join(os.environ['TEMP'],"GapfillTemp")
tempDir = r'C:\Temp\LST_Test_NB\Intermediate'
outDir = r'C:\Temp\LST_Test_NB'

# END OF CONFIG SECTION

# Calculated values

Values in this section will be calcualted based on previous inputs, or are hard-coded

In [None]:
# always round up as there's no harm done in passing slightly too much data
_A1_SEARCH_RADIUS = int(math.sqrt (_A1_SEARCH_NBRS / 3.14) + 1)
_DESPECKLE_SEARCH_RADIUS = int(math.sqrt (_DESPECKLE_SEARCH_NBRS / 3.14) + 1)

_TOTAL_REQ_MARGIN = _A1_SEARCH_RADIUS + _DESPECKLE_SEARCH_RADIUS


In [None]:
dayMonths = {1:1, 9:1, 17:1, 25:1, 33:2, 41:2, 49:2, 57:2, 65:3, 73:3, 81:3, 89:3, 97:4, 105:4, 113:4, 121:4, 
             129:5, 137:5, 145:5, 153:6, 161:6, 169:6, 177:6, 185:7, 193:7, 201:7, 209:7, 217:8, 225:8, 233:8, 
             241:8, 249:9, 257:9, 265:9, 273:9, 281:10, 289:10, 297:10, 305:10, 313:11, 321:11, 329:11, 337:12, 
             345:12, 353:12, 361:12}

In [None]:
calendarDays = [str(i).zfill(3) for i in np.arange(1,365,8)]

In [None]:
FlagValues = {
         "OCEAN" : _OCEAN_FLAG,
         "FAILURE" : _FILL_FAILED_FLAG,
         "SPECKLE" : _SPECKLE_FLAG,
         "EXTREME" : _EXTREME_FLAG,
         "A1_FILLED" : _A1_FILLED_FLAG,
         "A1_FULL" : _A1_FILL_WAS_FULL_FLAG,
         "A2_FILLED" : _A2_SUCCESS_FLAG,
         "CLIPPED" : _MINMAX_CLIP_FLAG
         }
DespeckleSpiralConfig = {
        "MAX_NBRS_TO_SEARCH" : _DESPECKLE_SEARCH_NBRS,
        "MIN_NBRS_REQUIRED" : _DESPECKLE_MIN_USED_NBRS,
        "MAX_NBRS_REQUIRED" : _DESPECKLE_MAX_USED_NBRS
        }
DespeckleLimitsConfig = {
        "HARD_UPPER_LIMIT" : _DATA_UPPER_LIMIT,
        "HARD_LOWER_LIMIT" : _DATA_LOWER_LIMIT,
        "INVALID_BEYOND_STDS": _EXTREME_VALUE_THRESHOLD,
        "SPECKLE_BEYOND_STDS": _SPECKLE_THRESHOLD,
        "ZSCORE_ACCEPTANCE_STDS": _SPECKLE_NBR_Z_THRESH
        }
A1SpiralConfig = {
        "MAX_NBRS_TO_SEARCH" : _A1_SEARCH_NBRS,
        "MIN_NBRS_REQUIRED" : _A1_MIN_USED_NBRS,
        "MAX_NBRS_REQUIRED" : _A1_MAX_USED_NBRS
        }



#### Set AOI for calculation

In [None]:
totalFillHeight = yLims[1] - yLims[0]
totalFillWidth = xLims[1] - xLims[0]
reqDataL = max(0, xLims[0] - _TOTAL_REQ_MARGIN)
reqDataR = min(xLims[1] + _TOTAL_REQ_MARGIN, sourceDataWidth)
reqDataT = max(0, yLims[0] - _TOTAL_REQ_MARGIN)
reqDataB = min(yLims[1] + _TOTAL_REQ_MARGIN, sourceDataHeight)
reqDataHeight = reqDataB - reqDataT
reqDataWidth = reqDataR - reqDataL
totalMarginL = xLims[0] - reqDataL
totalMarginR = reqDataR - xLims[1]
totalMarginT = yLims[0] - reqDataT
totalMarginB = reqDataB - yLims[1]

#### Calculate feasible slice size for A1, based on above inputs

In [None]:
dataBPP = 4
outputsBPP = dataBPP*2 + 1 # the dists, output, flags
zSize = 15 # number of years we have (at most)
sliceSqrd = totalAvailMem / (zSize * (dataBPP+outputsBPP))
sliceXSize = sliceSqrd / reqDataHeight 
#adjust = sliceXSize % 256


#### Calculate geotransform for the outputs (if non global)

In [None]:
topLeftLong = -180.0
if xLims[0] != 0:
    topLeftLong += xLims[0] * 0.008333333333333
topLeftLat = 90
if yLims[0] != 0:
    topLeftLat -= yLims[0] * 0.008333333333333
outputGT = (topLeftLong, 0.008333333333333, 0.0, topLeftLat, 0.0, -0.008333333333333)

In [None]:
outputGT

In [None]:
sliceXSize

### Calculate the slices by which A1 will be run

In [None]:
# generate the slice boundaries, slicing only in the x dimension 
# (we are using full data in y dimension for now, though this doesn't have to be so)
# The slice boundaries overlap by 2* _SEARCH_RADIUS pixels (_SEARCH_RADIUS on each slice)
# This allows the gap-filling code to run on the non-overlapping section of the slice 
# while having all the data necessary to fill a gap up to the edge of that non-overlapping 
# section

nchunks = int((totalFillWidth // sliceXSize) + 1)
# generate the "chunk" boundaries that will represent the data processed in one thread's job. 
chunkedges = np.linspace(xLims[0], xLims[1], nchunks+1).astype(np.int32)
leftRealEdges = chunkedges[:-1]
rightRealEdges = chunkedges[1:]
left_A1_edges = np.clip((chunkedges - _A1_SEARCH_RADIUS)[:-1], 0, np.inf).astype(np.int32)
right_A1_edges = np.clip((chunkedges + _A1_SEARCH_RADIUS)[1:], -np.inf, sourceDataWidth).astype(np.int32)
left_Despeckle_edges = np.clip((chunkedges - _TOTAL_REQ_MARGIN)[:-1], 0, np.inf).astype(np.int32)
right_Despeckle_edges = np.clip((chunkedges + _TOTAL_REQ_MARGIN)[1:], -np.inf, sourceDataWidth).astype(np.int32)

# the left and right task boundaries are _SEARCH_RADIUS bigger than the data that will be searched 
# within them so that all pixels can have neighbours, if possible (not at global edge)
x_offsets_overlapping = zip(left_Despeckle_edges, left_A1_edges, leftRealEdges, 
                            rightRealEdges, right_A1_edges, right_Despeckle_edges)

# can we pad the top and bottom? (not if we are doing a global run as y-slicing isn't implemented)
topA1Edge = np.clip(yLims[0]-_A1_SEARCH_RADIUS, 0, np.inf).astype(np.int32)
bottomA1Edge = np.clip(yLims[1]+_A1_SEARCH_RADIUS, -np.inf, sourceDataHeight).astype(np.int32)
topDespeckleEdge = np.clip(yLims[0]-_TOTAL_REQ_MARGIN, 0, np.inf).astype(np.int32)
bottomDespeckleEdge = np.clip(yLims[1]+_TOTAL_REQ_MARGIN, -np.inf, sourceDataHeight).astype(np.int32)

In [None]:
# Generate the list of tasks that are needed to run the overall process: 
# A1 slice-by-slice and then A2 when each calendar day is complete
a1Tasklist = list(itertools.product(
    calendarDays,
    x_offsets_overlapping,
    [(topDespeckleEdge, topA1Edge, yLims[0], yLims[1], bottomA1Edge, bottomDespeckleEdge)]))

In [None]:
# Alternatively restart if somebody turned the server off on you
# restartTaskList = [t for t in a1Tasklist if int(t[0])<49]

# or to run some extra data 
# extendTaskList = [t for t in a1Tasklist if 0 < int(t[0]) < 154]

testTaskList = [t for t in a1Tasklist if int(t[0]) == 361]

In [None]:
_USE_INTERMEDIATE_FILES = sliceXSize < totalFillWidth

In [None]:
_USE_INTERMEDIATE_FILES

In [None]:
templateDS = gdal.Open(means_Synoptic_Path)
bnd = templateDS.GetRasterBand(1)
globalGT = templateDS.GetGeoTransform()
globalProj = templateDS.GetProjection()
templateDS = None


In [None]:
sourceFileList = []
cdDataMemMapFile = ""
cdDistsMemMapFile = ""
cdFlagsMemMapFile = ""
currentMeans = None
currentStds = None

# Main Run command

Use these cells to actually run the gapfilling, once all the cells below and above have been successfully executed

In [None]:
startFillAt=2018000

In [None]:
# Kick everything off
global sourceFileList
global currentMeans
global currentMonthNumber

# PICK WHAT TO DO!
#myTaskList = oneTask
#myTaskList = day305
myTaskList = testTaskList
#myTaskList = restartTaskList
#myTaskList = day009
#myTaskList = day193

starttime = time.time()
for sliceId in range(len(myTaskList)): 
    thisSlice = myTaskList[sliceId]
    if sliceId == 0 or myTaskList[sliceId-1][0]!=thisSlice[0]:
        #this is the first slice for this calendar day.
        # find out the file names involved and load them into a filthy global
        sourceFileList = sorted(glob.glob(strPattern.format(thisSlice[0])))
        currentMonthNumber = dayMonths[int(thisSlice[0])]
    despeckleAndA1Caller(thisSlice, startFillAt)
    if sliceId == len(myTaskList) - 1 or myTaskList[sliceId+1][0] != thisSlice[0]:
        # We assume that the tasklist is sorted by calendar day and detect when it is time to run A2 
        # by the change in day num.
        # this was the last slice for this calendar day. We are ready to run A2 and save out the 
        # outputs
        a2Caller(None, startFillAt) # use e.g. a2Caller('A2008') to only run A2 on and save out the file for one year of the CD
        
print ("All done in {0!s} seconds!".format(time.time()-starttime))

# Operational code: Load all cells below this point

In [None]:
# not implemented!
def logMsg(stringMsg, terminateLine=True):
    if terminateLine:
        pass
        #print stringMsg
    else:
        pass
        #print stringMsg,

### A1 Caller

In [None]:
def despeckleAndA1Caller(sliceInfo, startFillAt=None):
    '''
    Run Despeckle and A1 for a given slice of a calendar day (all years, same day in each).
    
    Calculates all the margins and other information necessary to call the despeckle and A1 cython functions for 
    a given slice or global image.
    
    Save outputs into uncompressed tiff or tables file (untested) that have the size of the total area
    being processed
    '''
    global sourceFileList
    global cdDataMemMapFile
    global cdDistsMemMapFile
    global cdFlagsMemMapFile
   
    global _NDV
    global _MINMAX_CLIP_FLAG
    
    dayNum = sliceInfo[0]
    xInfo = sliceInfo[1]
    yInfo = sliceInfo[2]
    monthNum = dayMonths[int(sliceInfo[0])]
    
    # Warning headfuck ensues with regard to which data has which margins!...:
    # 
    # A1 needs a margin outside the area it is filling, if possible 
    # (i.e. if it's not a global image, which it isn't because we wouldn't have enough mem for that, and
    # where this slice isn't at the true edge i.e. 180deg E/W or 90deg N/S)
    # 
    # Despeckle needs a further margin outside of that so that A1 gets margin data that has itself been 
    # despeckled.
    # A sliceInfo is structured like this:
    # ('017',
    # (21720, 21740, 21840, 23040, 23140, 23160),
    # (9540, 9560, 9660, 10560, 10660, 10680))
    
    # Work out where the slice sits within global coordinates, prefixed with g_
    g_sliceDespeckleL = xInfo[0]     # global coords i.e. relative to res of full raster
    g_sliceA1L = xInfo[1]            # global coords
    g_sliceFillL = xInfo[2]          # global coords
    g_sliceFillR = xInfo[3]          # global coords
    g_sliceA1R = xInfo[4]            # global coords
    g_sliceDespeckleR = xInfo[5]     # global coords
    
    g_sliceDespeckleT = yInfo[0]     # global coords
    g_sliceA1T = yInfo[1]            # global coords
    g_sliceFillT = yInfo[2]          # global coords
    g_sliceFillB = yInfo[3]          # global coords
    g_sliceA1B = yInfo[4]            # global coords
    g_sliceDespeckleB = yInfo[5]     # global coords
    
    # the total width that will be gapfilled and written out
    sliceFillWidth = g_sliceFillR - g_sliceFillL
    # the total width of data we need, equal to reqDataWidth if only 1 slice
    sliceDespeckleWidth = g_sliceDespeckleR - g_sliceDespeckleL 
    # the total width that will be passed to a1, not actually needed
    sliceA1Width = g_sliceA1R - g_sliceA1L 
    
    # as above but for height, equal to reqDataHeight if only 1 slice
    sliceFillHeight = g_sliceFillB - g_sliceFillT
    sliceDespeckleHeight = g_sliceDespeckleB - g_sliceDespeckleT 
    sliceA1Height = g_sliceA1B - g_sliceA1T
    assert sliceFillHeight == totalFillHeight # not actually supporting y slices for now...
    
    # how big a margin does despeckle actually have to work with? (as at the global edges,
    # a margin is not possible)
    sliceDespeckleMarginL = int(g_sliceA1L - g_sliceDespeckleL)
    sliceDespeckleMarginR = int(g_sliceDespeckleR - g_sliceA1R)
    sliceDespeckleMarginT = int(g_sliceA1T - g_sliceDespeckleT)
    sliceDespeckleMarginB = int(g_sliceDespeckleB - g_sliceA1B)
    assert ((sliceDespeckleMarginL >= 0) and (sliceDespeckleMarginR >= 0) 
            and (sliceDespeckleMarginR >= 0) and (sliceDespeckleMarginB >= 0))
    
    # how big a margin does A1 actually have to work with? (As at the global edges,
    # a margin is not possible)
    sliceA1MarginL = g_sliceFillL - g_sliceA1L
    sliceA1MarginR = g_sliceA1R - g_sliceFillR
    sliceA1MarginT = g_sliceFillT - g_sliceA1T
    sliceA1MarginB = g_sliceA1B - g_sliceFillB
    assert ((sliceA1MarginL >= 0) and (sliceA1MarginR >= 0) 
            and (sliceA1MarginR >= 0) and (sliceA1MarginB >= 0))
    
    # the coords within the input data that can be used, where the output (filled) 
    # data will sit
    sliceTotalMarginT = int(sliceA1MarginT + sliceDespeckleMarginT)
    sliceTotalMarginB = int(sliceA1MarginB + sliceDespeckleMarginB)
    sliceTotalMarginL = int(sliceA1MarginL + sliceDespeckleMarginL)
    sliceTotalMarginR = int(sliceA1MarginR + sliceDespeckleMarginR)
    
    # if we are not running the whole globe, our output files do not have 
    # global pixel coords but local ones relative to the total processing size
    # E.g. "Africa" starts at global X of 18467 but this translates to X of zero in the output image
    # Hence get the coords of this slice within the output space coordiantes, prefixed with out_
    # - i.e. where the output slice fits into the overall output image
    out_SliceFillL = g_sliceFillL - xLims[0] 
    out_SliceFillR = out_SliceFillL + sliceFillWidth 
    out_SliceFillT = g_sliceFillT - yLims[0]
    out_SliceFillB = out_SliceFillT + sliceFillHeight
    # and as we aren't yet supporting slices in the vertical dimension:
    assert out_SliceFillT == 0
    
    # The the means / stds we read in have a different coordinate space again: it's the same as the 
    # output, but with the overall (total) margins added. These give the coordinates of the output image 
    # within the space of the data that is read in
    in_SliceDataL = out_SliceFillL + totalMarginL
    in_SliceDataR = in_SliceDataL + sliceFillWidth
    #in_SliceDataR = in_SliceDataL +  sliceFillWidth + sliceTotalMarginR #sliceDespeckleWidth # out_SliceFillR + totalMarginL
    in_SliceDataT = out_SliceFillT + totalMarginT
    in_SliceDataB = in_SliceDataT + sliceFillHeight
    #in_SliceDataB = in_SliceDataT + sliceFillHeight + sliceTotalMarginB #sliceDespeckleHeight # out_SliceFillB + totalMarginT
    
    # And this finally gives the coords of the slice of the mean /std in its own space
    #processDataL = max(0,processFillL - sliceTotalMarginL)
    #processDataR = processDataL + sliceDespeckleWidth
    
    print ("************ BEGIN SLICE **************")
    print ("Processing slice of global coords T:{0!s}, B:{1!s}, L:{2!s}, R:{3!s}"
           .format(g_sliceFillT,g_sliceFillB,g_sliceFillL,g_sliceFillR))
    print ("using data of calendar day {0!s}, global coords {1!s} - {2!s} ({3!s} years)"
           .format(dayNum, g_sliceDespeckleL, g_sliceDespeckleR, len(sourceFileList)))
    
    print ("Filling output slice {0!s} - {1!s} from overall required fill width of {2!s}"
           .format( out_SliceFillL, out_SliceFillR, totalFillWidth))
    print ("Loading data from global coords {0!s} - {1!s}..."
           .format(g_sliceDespeckleL,g_sliceDespeckleR))
    print ("...into overall process coords of {0!s} - {1!s} within process data width of {2!s}"
           .format(in_SliceDataL, in_SliceDataR, reqDataWidth))
     
    if not startFillAt:
        stackFillFromPos=0
    else:
        stackFillFromPos=-1
        for y in range (len(sourceFileList)):
            fileDate = int(os.path.basename(sourceFileList[y]).split('_')[0][1:])
            if startFillAt and fileDate>=startFillAt:
                stackFillFromPos = y
                break
        if stackFillFromPos == -1:
            print ("nothing to do")
            return
            
 
    start_time = time.time()
    try:
        stackData = np.empty(shape = (len(sourceFileList), sliceDespeckleHeight, sliceDespeckleWidth), 
                          dtype = 'Float32')
    except MemoryError:
        print("Failed to allocate sufficient memory for array of size {0!s}x{1!s}x{2!s}".format(
            len(sourceFileList), sliceDespeckleHeight, sliceDespeckleWidth))
        raise
    floatFileProps = None
    byteFileProps = None
    _NDV = None
 
    # read the source files into a calendar-day stack
    
    for y in range (len(sourceFileList)):
        with rasterio.open(sourceFileList[y]) as src:
            logMsg(".", False)
            stackData[y] = src.read(1,window=((g_sliceDespeckleT, g_sliceDespeckleB),
                                                    (g_sliceDespeckleL, g_sliceDespeckleR)),
                                          masked=False)
            testNDV = src.nodatavals[0]
            if _NDV is None:
                _NDV = testNDV
            else:
                assert _NDV == testNDV
            if not floatFileProps:
                floatFileProps = src.meta
            if not byteFileProps:
                byteFileProps = src.meta
    assert stackData.flags.c_contiguous
    print ("Stack will be filled from position {0!s}".format(stackFillFromPos))
    #assert False
    # read the means for the required 
    with rasterio.open(means_Synoptic_Path) as src:
        sliceMeanSynoptic = src.read(1, window=((g_sliceDespeckleT, g_sliceDespeckleB),
                                                    (g_sliceDespeckleL, g_sliceDespeckleR)), 
                                  masked=False)
    
    with rasterio.open(std_Synoptic_Path) as src:
        sliceStdSynoptic = src.read(1, window=((g_sliceDespeckleT, g_sliceDespeckleB),
                                                    (g_sliceDespeckleL, g_sliceDespeckleR)), 
                                 masked=False)

    if means_Stack_Path is not None:
        with rasterio.open(means_Stack_Path) as src:
            sliceMeanMonthly = src.read(monthNum, window=((g_sliceDespeckleT, g_sliceDespeckleB),
                                                    (g_sliceDespeckleL, g_sliceDespeckleR)), 
                                 masked=False)
    else:
        sliceMeanMonthly = sliceMeanSynoptic
        
    if std_Stack_Path is not None:
        with rasterio.open(std_Stack_Path) as src:
            sliceStdMonthly = src.read(monthNum, window=((g_sliceDespeckleT, g_sliceDespeckleB),
                                                    (g_sliceDespeckleL, g_sliceDespeckleR)), 
                                 masked=False)
    else:
        sliceStdMonthly = sliceStdSynoptic
        
    with rasterio.open(coast_Path) as src:
        sliceCoast = src.read(1, window=((g_sliceDespeckleT, g_sliceDespeckleB),
                                                    (g_sliceDespeckleL, g_sliceDespeckleR)),  
                                   masked=False)
    
    flags = np.zeros(shape = (len(sourceFileList),sliceDespeckleHeight,sliceDespeckleWidth), dtype=np.uint8)
    
    tDelta = time.time()
    print ("Data loaded in {0!s} seconds.".format(
        tDelta - start_time))
    print ("Despeckle margins are T:{0!s}, B:{1!s}, L:{2!s}, R:{3!s}".format(
        sliceDespeckleMarginT, sliceDespeckleMarginB, sliceDespeckleMarginL, sliceDespeckleMarginR))
  
        
    # despeckle, initialise flags, and clip to coastline
    #leleFlags(threadData,flags,
    #                sliceMean,sliceStd,
    #                sliceCoast, _NDV, _DESPECKLE_SEARCH_NBRS,
    #                _OCEAN_FLAG, _FILL_FAILED_FLAG, _SPECKLE_FLAG, _SPECKLE_EXTREME_FLAG,
    #                _EXTREME_VALUE_THRESHOLD, _SPECKLE_THRESHOLD,
    #                _DATA_UPPER_LIMIT, _DATA_LOWER_LIMIT,
    #                _DATA_CORRECTION_OFFSET,
    #                sliceDespeckleMarginT, sliceDespeckleMarginB,
    #                sliceDespeckleMarginL, sliceDespeckleMarginR
    #                #,totalMarginT, totalMarginB, totalMarginL, totalMarginR
    #                )
    dictDataStacks = {
                        "Data"            : stackData,
                        "Flags"           : flags,
                        "Means"           : sliceMeanMonthly,
                        "Stds"            : sliceStdMonthly,
                        "LandMask"        : sliceCoast,
                        "DistTemplate"    : None,
                        "KnownUnfillable" : None
                        }
    despeckleMargins = {
                        "TOP"    : sliceDespeckleMarginT,
                        "BOTTOM" : sliceDespeckleMarginB,
                        "LEFT"   : sliceDespeckleMarginL,
                        "RIGHT"  : sliceDespeckleMarginR
                        }
    a1Margins = {
                        "TOP"    : sliceTotalMarginT,
                        "BOTTOM" : sliceTotalMarginB,
                        "LEFT"   : sliceTotalMarginL,
                        "RIGHT"  : sliceTotalMarginR
                        }
    
    deSpeckleRes = setSpeckleFlags (dictDataStacks, 
                     FlagValues, 
                     DespeckleSpiralConfig, 
                     DespeckleLimitsConfig,
                     despeckleMargins,
                     _NDV,
                     _DATA_CORRECTION_OFFSET
                     )
    #setSpeckleFlagsOld ( stackData, flags, sliceMean, sliceStd, sliceCoast,
    #                    _NDV, FlagValues, DespeckleSpiralConfig, DespeckleLimitsConfig,
    #                    _DATA_CORRECTION_OFFSET, 
    #                    sliceDespeckleMarginT, sliceDespeckleMarginB, sliceDespeckleMarginL, sliceDespeckleMarginR
    #                    )
    dictDataStacks["Data"] = deSpeckleRes[0]
    dictDataStacks["Flags"] = deSpeckleRes[1]
    
    print ("Despeckled calendar day {0!s}, slice {1!s} - {2!s}, in {3!s} seconds.".format(
        dayNum, g_sliceDespeckleL, g_sliceDespeckleR, time.time() - tDelta))
    tDelta = time.time()
    
    # Run A1! threadData and flags were modified in place by despeckle function
    print ("A1 Margins are T:{0!s}, B:{1!s}, L:{2!s}, R:{3!s}".format(sliceTotalMarginT, sliceTotalMarginB, 
                                                                     sliceTotalMarginL, sliceTotalMarginR))
    #result = fillGapsA1_cy_Additive (threadData, flags,
    #                                    _OCEAN_FLAG, _FILL_FAILED_FLAG, _A1_FILLED_FLAG, _A1_FILL_WAS_FULL_FLAG,
    #                                    _NDV,# _DATA_ZERO_OFFSET, _MAX_ALLOWABLE_RATIO,
    #                                    sliceTotalMarginT, sliceTotalMarginB,
    #                                    sliceTotalMarginL, sliceTotalMarginR)           
    
    result = fillGapsA1_Cy (
                        dictDataStacks,
                        FlagValues,
                        A1SpiralConfig,
                        a1Margins,
                        _NDV,
                        _FILL_BY_RATIO,
                        _DATA_ZERO_OFFSET,
                        _MAX_ALLOWABLE_RATIO,
                        stackFillFromPos
                        )
    # Result contains (output,dists,new flags, waffle), 
    # all exccept waffle being of size actualHeight*actualWidth
    del dictDataStacks
    del stackData
    del flags
    
    print ("Ran A1 in {0!s} seconds.".format(
        time.time() - tDelta))
    tDelta = time.time()                      
    print ("Results shape = "+str(result[0].shape))
    print ("Flags shape = "+str(result[1].shape))
    
    # Clip the filled values to min / max as a multiple of stds from the mean
    if _CLIP_TO_LIMITS:
        sliceMeanSynoptic = np.copy(sliceMeanSynoptic[sliceTotalMarginT:sliceTotalMarginT+sliceFillHeight,
                                     sliceTotalMarginL:sliceTotalMarginL+sliceFillWidth])
   
        sliceStdSynoptic = np.copy(sliceStdSynoptic[sliceTotalMarginT:sliceTotalMarginT+sliceFillHeight,
                                      sliceTotalMarginL:sliceTotalMarginL+sliceFillWidth])
        MinMaxClip3D(result[0], result[2], sliceMeanSynoptic, sliceStdSynoptic,  
                   _A1_FILLED_FLAG, _MINMAX_CLIP_FLAG, _FLOOR_CEILING_VALUE, _NDV, 
                   _DATA_UPPER_LIMIT, _DATA_LOWER_LIMIT)

    if not _USE_INTERMEDIATE_FILES:
        global _INTERMEDIATE_DATA
        global _INTERMEDIATE_FLAGS
        global _INTERMEDIATE_DISTS
        _INTERMEDIATE_DATA = np.asarray(result[0])
        _INTERMEDIATE_DISTS = np.asarray(result[1])
        _INTERMEDIATE_FLAGS = np.asarray(result[2])
        
    elif _SAVEMETHOD == "TIFF":
        outDrv = gdal.GetDriverByName('GTiff')
        #globalGT = meansRasterStacked.GetGeoTransform()
        #globalProj = meansRasterStacked.GetProjection()
        #globResult = result
        dataFNTemplate = r"{0!s}{1!s}{2!s}{3!s}{4!s}"
        for y in range (len(sourceFileList)):
            if y < stackFillFromPos:
                continue
            print ("Saving intermediate file at stack position "+str(y))
            inFileName = os.path.basename(sourceFileList[y]).split(os.extsep)[0]
            dataFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateData_",inFileName,".tif")
            distFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateDists_",inFileName,".tif")
            flagFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateFlags_",inFileName,".tif")
            dataRaster = None
            distRaster = None
            flagRaster = None
            # no compression, no faffing with geotransforms: this is just saving a sparse array to disk
            if os.path.exists(dataFN):
                # this is not the first slice for this CD
                dataRaster = gdal.Open(dataFN, gdal.GA_Update)
            else:
                dataRaster = outDrv.Create(dataFN,totalFillWidth,totalFillHeight,1,gdal.GDT_Float32,
                                           ["TILED=YES", "SPARSE_OK=TRUE", "BIGTIFF=YES"])
            if os.path.exists(distFN):
                distRaster = gdal.Open(distFN, gdal.GA_Update)
            else:
                distRaster = outDrv.Create(distFN,totalFillWidth,totalFillHeight,1,gdal.GDT_Float32,
                                           ["TILED=YES","SPARSE_OK=TRUE", "BIGTIFF=YES"])

            if os.path.exists(flagFN):
                flagRaster = gdal.Open(flagFN,  gdal.GA_Update)
            else:
                flagRaster = outDrv.Create(flagFN,totalFillWidth,totalFillHeight,1,gdal.GDT_Byte,
                                           ["TILED=YES","SPARSE_OK=TRUE", "BIGTIFF=YES"])
            bnd = dataRaster.GetRasterBand(1)
            assert bnd is not None
            assert dataRaster is not None
            bnd.WriteArray(np.asarray(result[0][y]), out_SliceFillL, out_SliceFillT)
           
            bnd = distRaster.GetRasterBand(1)
            assert bnd is not None
            assert distRaster is not None
            bnd.WriteArray(np.asarray(result[1][y]), out_SliceFillL, out_SliceFillT)
           
            bnd = flagRaster.GetRasterBand(1)
            assert bnd is not None
            assert flagRaster is not None
            bnd.WriteArray(np.asarray(result[2][y]), out_SliceFillL, out_SliceFillT)
            
            #ensure flush
            bnd = None
            dataRaster = None
            distRaster = None
            flagRaster = None
    
    else:
        # Save to a memory-mapped array - basic uncompressed format
        #cdDataMemMapFile = os.path.join(tempDir, "A1_DataMemMap_CD{0}.mmap".format(str(dayNum)))
        cdDataMemMapFile = os.path.join(tempDir, "A1_FullMemMap_CD{0}.h5".format(str(dayNum)))
        # create the file on the first slice of each CD, else append
        dataMode = 'w' if not os.path.exists(cdDataMemMapFile) else 'r+'

        with tables.openFile(cdDataMemMapFile, dataMode) as dataTable:
            
            if dataMode == 'w':
                filt = tables.Filters(complevel=1,complib='zlib',shuffle=False)
                chunkshape = (1, 1048576/sliceFillWidth, sliceFillWidth)
                dataMemMap = dataTable.createCArray(
                    dataTable.root, 'IntermediateData',
                    atom=tables.Float32Atom(),
                    filters = filt,
                    chunkshape = chunkshape,
                    shape = (len(sourceFileList), totalFillHeight, totalFillWidth))
                distMemMap = dataTable.createCArray(
                    dataTable.root, 'IntermediateDists',
                    atom=tables.Float32Atom(), 
                    filters = filt,
                    chunkshape = chunkshape,
                shape = (len(sourceFileList), totalFillHeight, totalFillWidth))
                flagMemMap = dataTable.createCArray(
                    dataTable.root, 'IntermediateFlags',
                    atom=tables.UInt8Atom(), 
                    filters = filt,
                    chunkshape = chunkshape,
                    shape = (len(sourceFileList), totalFillHeight, totalFillWidth))
            else:
                dataMemMap = dataTable.root.IntermediateData
                distMemMap = dataTable.root.IntermediateDists
                flagMemMap = dataTable.root.IntermediateFlags

            #dataMemMap = np.memmap(cdDataMemMapFile,
             #                      dtype='Float32',
              #                     shape=(len(sourceFileList), totalFillHeight, totalFillWidth),
              #                     mode=dataMode)
            
            # we have asserted that actual height == totalheight and thus that we aren't running slices by 
            # latitude so don't bother specifying y coords
            dataMemMap[:,:,out_SliceFillL:out_SliceFillR] = np.asarray(result[0])[:]
            distMemMap[:,:,out_SliceFillL:out_SliceFillR] = np.asarray(result[1])[:]
            flagMemMap[:,:,out_SliceFillL:out_SliceFillR] = np.asarray(result[2])[:]
            dataTable.close()

    #del result
    logMsg ("Wrote outputs in {0!s} seconds.".format(
        time.time() - tDelta))
   # tDelta = time.time()
    logMsg ("Section {0!s} done in {1!s} seconds.".format(
        sliceInfo, time.time() - start_time))
    
    

### A2 Caller

In [None]:
def a2Caller(whichYearToDo = None, startFillAt = None):
    # Load each intermediate image that has been built up by running A1 in slices
    # Run A2 on it, if required, and save it to the output, compressed tiffs
    global sourceFileList
    global cdDataMemMapFile
    global cdDistsMemMapFile
    global cdFlagsMemMapFile
    global currentMeans
    global _NDV
    global _RUN_A2
    global stdsPreloaded
    
    # get means
    src = gdal.Open(means_Synoptic_Path)
    bnd = src.GetRasterBand(1)
    meanImageWhole = bnd.ReadAsArray(xLims[0],yLims[0],totalFillWidth,totalFillHeight)
    start_time = time.time()
    
    # the global sourceFileList contains all files of the current calendar date.
    # we may or may not want to run a2 on all of these
    for i in range (len(sourceFileList)):
        if whichYearToDo is not None:
            # whichFileToDo simply provides the ability to only run a2 on and write out one year from the 
            # stack, for quicker testing
            if sourceFileList[i].find(whichYearToDo) == -1:
                continue
        elif startFillAt is not None:
            # alternatively startFillAt provides the ability to run a2 for all files after a specific date,
            # for updating the cube
            fileDate = int(os.path.basename(sourceFileList[i]).split('_')[0][1:])
            if fileDate < startFillAt:
                continue
            
        if not _USE_INTERMEDIATE_FILES:
            # A1 ran entirely in memory so just grab a reference to the filthy global
            global _INTERMEDIATE_DATA
            global _INTERMEDIATE_FLAGS
            global _INTERMEDIATE_DISTS
            fullDayData = _INTERMEDIATE_DATA[i]
            fullDayDist = _INTERMEDIATE_DISTS[i]
            fullDayFlags = _INTERMEDIATE_FLAGS[i]
            
        # load the image into memory from the mem-map or uncompressed tiff
        elif _SAVEMETHOD=="TIFF":
            dataFNTemplate = r"{0!s}{1!s}{2!s}{3!s}{4!s}"
            inFileName = os.path.basename(sourceFileList[i]).split(os.extsep)[0]
            dataFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateData_",inFileName,".tif")
            distFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateDists_",inFileName,".tif")
            flagFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateFlags_",inFileName,".tif")
            ds = gdal.Open(dataFN)
            bnd = ds.GetRasterBand(1)
            fullDayData = bnd.ReadAsArray()
            ds = gdal.Open(distFN)
            bnd = ds.GetRasterBand(1)
            fullDayDist = bnd.ReadAsArray()
            ds = gdal.Open(flagFN)
            bnd = ds.GetRasterBand(1)
            fullDayFlags = bnd.ReadAsArray()
            ds = None
            bnd = None
        
        else:
            with tables.openFile(cdDataMemMapFile, 'r') as dataTable:
                dataMemMapStack = dataTable.root.IntermediateData
                distMemMapStack = dataTable.root.IntermediateDists
                flagMemMapStack = dataTable.root.IntermediateFlags

                fullDayData = np.copy(dataMemMapStack[i])
                fullDayDist = np.copy(distMemMapStack[i])
                fullDayFlags = np.copy(flagMemMapStack[i])
                
                del dataMemMapStack
                del distMemMapStack
                del flagMemMapStack
   
        if _RUN_A2:
            # run the 8 A2 passes, which operate on the data in-place
            A2Runner_1by1(fullDayData, fullDayFlags, fullDayDist, meanImageWhole)
        
        # A2_Runner_1by1 does the min-max clip
        
        #ds = gdal.Open(r'G:\MOD11A2\MOD11A2_Data\{0}\A2001001_LST_{0}.tif'.format(metric))
        outDrv = gdal.GetDriverByName('GTiff')
        
        outFNBase = os.path.basename(sourceFileList[i]).split('.')[0]
        outFNBaseTemplate = "{0}_{1}.tif"
        outFNTemplate = os.path.join(outDir,outFNBaseTemplate)
        print ("Saving to")
        print (outFNTemplate.format(outFNBase,"Filled_Data"))
        outRaster = outDrv.Create(outFNTemplate.format(outFNBase,"Filled_Data"),
                                  totalFillWidth,totalFillHeight,1,gdal.GDT_Float32,
                                  ["COMPRESS=LZW","TILED=YES","BIGTIFF=YES","NUM_THREADS=ALL_CPUS"])
        outRaster.SetGeoTransform(outputGT)
        outRaster.SetProjection(globalProj)
        bnd = outRaster.GetRasterBand(1)
        bnd.SetNoDataValue(_NDV)
        # WRITE THE DATA!
        bnd.WriteArray(fullDayData)
        bnd.FlushCache()
        outRaster.FlushCache()
        del bnd
        outRaster = None

        outRaster = outDrv.Create(outFNTemplate.format(outFNBase,"Fill_Dists"),
                                  totalFillWidth,totalFillHeight,1,gdal.GDT_Float32,
                                  ["COMPRESS=LZW","TILED=YES","NUM_THREADS=ALL_CPUS","BIGTIFF=YES"])
        outRaster.SetGeoTransform(outputGT)
        outRaster.SetProjection(globalProj)
        bnd = outRaster.GetRasterBand(1)
        bnd.SetNoDataValue(_NDV)
        # WRITE THE DATA!
        bnd.WriteArray(fullDayDist)
        bnd.FlushCache()
        outRaster.FlushCache()
        del bnd
        outRaster = None
        
        outRaster = outDrv.Create(outFNTemplate.format(outFNBase,"Fill_Flags"),
                                  totalFillWidth,totalFillHeight,1,gdal.GDT_Byte,
                                  ["COMPRESS=LZW","TILED=YES","NUM_THREADS=ALL_CPUS","BIGTIFF=YES"])
        outRaster.SetGeoTransform(outputGT)
        outRaster.SetProjection(globalProj)
        bnd = outRaster.GetRasterBand(1)
        bnd.SetNoDataValue(_NDV)
        # WRITE THE DATA!
        bnd.WriteArray(fullDayFlags)
        bnd.FlushCache()
        outRaster.FlushCache()
        del bnd
        outRaster = None
        
    print (time.time()-start_time)
    
    # Delete the intermediate files if they exist (they won't exist prior to the fill-from date
    # if one was used by A1)
    if _SAVEMETHOD=="TIFF" and _USE_INTERMEDIATE_FILES:
        for i in range(len(sourceFileList)):
            dataFNTemplate = r"{0!s}{1!s}{2!s}{3!s}{4!s}"
            inFileName = os.path.basename(sourceFileList[i]).split(os.extsep)[0]
            dataFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateData_",inFileName,".tif")
            distFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateDists_",inFileName,".tif")
            flagFN = dataFNTemplate.format(tempDir,os.path.sep,"A1IntermediateFlags_",inFileName,".tif")
            if os.path.exists(dataFN):
                os.remove(dataFN)
            if os.path.exists(distFN):
                os.remove(distFN)
            if os.path.exists(flagFN):
                os.remove(flagFN)
                
    elif _USE_INTERMEDIATE_FILES:
        os.remove(cdDataMemMapFile)
    

### A2 multi-pass runner

In [None]:
# Runs the 8 passes of A2 for a single image
def A2Runner_1by1(dataImageIn, flagsImageIn, distImageIn, meanImageIn):
    
    global stdsPreloaded
    assert dataImageIn.shape == flagsImageIn.shape
    assert flagsImageIn.shape == distImageIn.shape
       
    assert distImageIn.shape == meanImageIn.shape
    
    start_time = time.time()
    
    #(dataImageIn + distImageIn + meanImageIn + dataPassStack*8 + sumDistImage + distImageLocal + ratioImageLocal)*4 + flagsImageIn
    # = 57 bytes per pixel required, 933.1M pixels, ~50Gb RAM required. 
    # So A2 for 1km global Should _just_ be ok on a 64Gb machine! However in practice it gets memory errors because 
    # by the time it runs the intermediate data from the last slice of A1 has not been fully flushed to disk by the OS.
    # If we have to, then we could reread data or dist images between passes, or discard the pass results and keep a single 
    # running mean from the passes, if we are using a mean rather than median selector.
    
    dataPassStack = np.repeat(dataImageIn[np.newaxis, :, :], 8, axis=0) # gets modified in place
    
    sumDistImage = np.zeros_like(dataImageIn) # gets incremented in place
    global _NDV
    
    # we're not tinkering with this one, A2 will always run with 8 neighbours
    _A2_MAX_NBRS = 8
    
    # The A2 cython code just runs in the same order each time on what it gets (y outer, x inner, 0-n).
    # To get the 8 different directional passes we re-stride the inputs here, so that this causes them 
    # to be passed over in each of the directions. This means that some passes are _much_ slower than 
    # others as they aren't accessing the memory in the native contiguous order, but it saves us 
    # making a physical copy each time
    for passNumber in range(0, 8):
        print ("Running A2 pass "+str(passNumber))
        tDelta = time.time()
        # these will not be modified by A2, so they just get re-strided and passed in without copying
        dataGlobalImage = None
        flagsGlobalImage = None
        distGlobalImage = None
        meanGlobalImage = None
        distImageCopy = np.copy(distImageIn)
        # these ones get modified by A2, so they get restrided each time
        dataPassImage = None
        # this one gets modified by A2 and is a running total of all passes so does not get reset
        sumDistPassImage = sumDistImage
        outerLoop=0
        if passNumber == 0: # pass "a" takes ~50s for a global image
            dataGlobalImage = dataImageIn.T
            flagsGlobalImage = flagsImageIn.T
            distGlobalImage = distImageCopy.T
            meanGlobalImage = meanImageIn.T
            sumDistPassImage = sumDistImage.T
            dataPassImage = dataPassStack[passNumber].T
            outerLoop=1
        elif passNumber == 1: # pass "b" ~50s
            dataGlobalImage = dataImageIn.T[:,::-1]
            flagsGlobalImage = flagsImageIn.T[:,::-1]
            distGlobalImage = distImageCopy.T[:,::-1]
            meanGlobalImage = meanImageIn.T[:,::-1]
            sumDistPassImage = sumDistImage.T[:,::-1]
            dataPassImage = dataPassStack[passNumber].T[:,::-1]
            outerLoop=1
        elif passNumber == 2: # pass "c" ~50s
            dataGlobalImage = dataImageIn.T[::-1,:]
            flagsGlobalImage = flagsImageIn.T[::-1,:]
            distGlobalImage = distImageCopy.T[::-1,:]
            meanGlobalImage = meanImageIn.T[::-1,:]
            sumDistPassImage = sumDistImage.T[::-1,:]
            dataPassImage = dataPassStack[passNumber].T[::-1,:]
            outerLoop=1
        elif passNumber == 3: # pass "d" ~50s
            dataGlobalImage = dataImageIn.T[::-1,::-1]
            flagsGlobalImage = flagsImageIn.T[::-1,::-1]
            distGlobalImage = distImageCopy.T[::-1,::-1]
            meanGlobalImage = meanImageIn.T[::-1,::-1]
            sumDistPassImage = sumDistImage.T[::-1,::-1]
            dataPassImage = dataPassStack[passNumber].T[::-1,::-1]
            outerLoop=1
        elif passNumber == 4: # pass "e" - this one is the native c-ordering, ~7s!
            dataGlobalImage = dataImageIn
            flagsGlobalImage = flagsImageIn
            distGlobalImage = distImageCopy
            meanGlobalImage = meanImageIn
            sumDistPassImage = sumDistImage
            dataPassImage = dataPassStack[passNumber]
            
        elif passNumber == 5: # pass "f" ~9s
            dataGlobalImage = dataImageIn[:,::-1]
            flagsGlobalImage = flagsImageIn[:,::-1]
            distGlobalImage = distImageCopy[:,::-1]
            meanGlobalImage = meanImageIn[:,::-1]
            sumDistPassImage = sumDistImage[:,::-1]
            dataPassImage = dataPassStack[passNumber][:,::-1]
            
        elif passNumber == 6: # pass "g" ~8s
            dataGlobalImage = dataImageIn[::-1,:]
            flagsGlobalImage = flagsImageIn[::-1,:]
            distGlobalImage = distImageCopy[::-1,:]
            meanGlobalImage = meanImageIn[::-1,:]
            sumDistPassImage = sumDistImage[::-1,:]
            dataPassImage = dataPassStack[passNumber][::-1,:]
            
        else: #elif passNumber == 7: # pass "h" ~8s
            dataGlobalImage = dataImageIn[::-1,::-1]
            flagsGlobalImage = flagsImageIn[::-1,::-1]
            distGlobalImage = distImageCopy[::-1,::-1]
            meanGlobalImage = meanImageIn[::-1,::-1]
            sumDistPassImage = sumDistImage[::-1,::-1]
            dataPassImage = dataPassStack[passNumber][::-1,::-1]
        
        #print "a2 pass "+str(passNumber)+"...",
        
        fillGapsA2_Cy (
             { 
                "Data":      dataGlobalImage,
                "Flags":     flagsGlobalImage,
                "Distances": distGlobalImage,
                "Means":     meanGlobalImage,
                "SumDist":   sumDistPassImage,
                "Output":    dataPassImage
             },
             FlagValues,
             _NDV,
             _A2_MAX_NBRS,
             _FILL_BY_RATIO,
             _DATA_ZERO_OFFSET,
             _MAX_ALLOWABLE_RATIO
         )

        #print "done in "+str(time.time()-tDelta)+" seconds"
    
    tDelta = time.time()
    # process numpyisms on the A2 data stack tile-by-tile because it makes temp arrays,
    # and we probably can't afford that memory
    xCorners = np.linspace(0,dataImageIn.shape[1],6).astype(np.int32)
    yCorners = np.linspace(0,dataImageIn.shape[0],6).astype(np.int32)
    medianVals = np.empty_like(dataImageIn) # or mean, depending on choice
    countVals = np.empty(shape=dataImageIn.shape,dtype='byte')
    for x in range(len(xCorners)-1):
        for y in range(len(yCorners)-1):
            x0 = xCorners[x]
            x1 = xCorners[x+1]
            y0 = yCorners[y]
            y1 = yCorners[y+1]
            dataPassSlice = dataPassStack[:,y0:y1,x0:x1]
            countSlice = countVals[y0:y1,x0:x1]
            np.sum(dataPassSlice != _NDV, axis=0, out=countSlice)
            # exclude no-data from mean / median calculation by using nan-aware functions
            dataPassSlice[dataPassSlice == _NDV] = np.nan
            medianValsSlice = medianVals[y0:y1,x0:x1]
            if _A2_PASS_SELECTOR == "MEAN":
                medianValsSlice[:] = bn.nanmean(dataPassSlice, axis=0)
            else:
                medianValsSlice[:] = bn.nanmedian(dataPassSlice, axis=0)
            
            
    dataPassStack = None
    del dataPassStack # and breathe
    gc.collect()
    
    # A2 doesn't modify the flags image, so anywhere that the fill had failed is where it was 
    # needed
    a2FillWasNeeded = np.bitwise_and(flagsImageIn,_FILL_FAILED_FLAG)==_FILL_FAILED_FLAG
    a2FillWasNeededCount = np.count_nonzero(a2FillWasNeeded)
    # we have a value for anywhere that the output isn't _NDV (now np.nan)
    a2FilledLocs = np.logical_and(a2FillWasNeeded,~np.isnan(medianVals))
    a2FilledCount = np.count_nonzero(a2FilledLocs)
    # so copy those a2 results into the "output" data image
    dataImageIn[a2FilledLocs] = medianVals[a2FilledLocs]
    flagsImageIn[a2FilledLocs] -= _FILL_FAILED_FLAG # should use bitwise ^ really
    flagsImageIn[a2FilledLocs] += _A2_SUCCESS_FLAG  # should use bitwise or
    # set distance metric to be the total A2 distance divided by the number of A2 passes it was from
    distImageIn[a2FilledLocs] = sumDistImage[a2FilledLocs] / countVals[a2FilledLocs]
    
    if _CLIP_TO_LIMITS:
        # if we don't have memory to store global stds, we could load them again here 
        # as they are only needed at this point so we don't need them in mem at same 
        # time as the A2 stack
        #with rasterio.open(std_Synoptic_Path) as src:
        if stdsPreloaded is None:
            src = gdal.Open(std_Synoptic_Path)
            bnd = src.GetRasterBand(1)
            stds = bnd.ReadAsArray(xLims[0],yLims[0],totalFillWidth,totalFillHeight)
            bnd = None
            src = None
                #stds = src.read_band(1, window=((totalMarginT,totalMarginT+totalFillHeight),
                #                                        (totalMarginL,totalMarginL+totalFillWidth)), 
                #                     masked=False)
        else:
            stds = stdsPreloaded
        MinMaxClip(dataImageIn, flagsImageIn, meanImageIn, stds,
                   _A2_SUCCESS_FLAG, _MINMAX_CLIP_FLAG, _FLOOR_CEILING_VALUE, _NDV,
                   _DATA_UPPER_LIMIT, _DATA_LOWER_LIMIT)
    #todo - clip to min/max?
    #print "Flattening done in "+str(time.time()-tDelta)+" seconds"
    print ("A2 done, filled {0} locations of {1} required in {2} seconds using {3} of 8 passes".format(
                                                    a2FilledCount,a2FillWasNeededCount,
                                                    str(time.time()-start_time), _A2_PASS_SELECTOR))
    

# Cython functions below this point

All functions that involve iterating over the data and which cannot easily be vectorised have been rewritten in Cython with all optimisations used. For example, bounds checking turned off, all arrays structured and iterated over (excepy in A2) in native C order, all variables typed, etc. (They were first checked with bounds check etc turned on!). 

Most of the centre loops have been parallelised using OpenMP via Cython's tools and set to an appopriate number of threads for the 64-core machines that they were run on.

Thanks to the simplicity of Cython for writing python code that is then translated to optimised C, the algorithms are visually very little changed in comparison to the versions published in the original paper, which were also simple looping functions. 

Of note is that a vectorised version of A1 was produced and tested (based on numpy) but it was far, far slower than these Cython versions even on one thread.

Using IPython notebook (once properly configured) the Cython functions can be compiled and loaded simply by running the cells.

### Despeckle and create flags

In [None]:
%%cython --compile-args=/openmp --link-args=/openmp --force
# %%cython  --compile-args=-fopenmp --link-args=-fopenmp --force
# the syntax of the arguments above is for Linux - it is different for windows and if this isn't set 
# then the openmp support won't actually be switched on (though no error will be seen) and the code 
# will only run on 1 thread
# On windows use 
#%% cython --compile-args=/openmp --link-args=/openmp --force
import numpy as np
from libc.math cimport fabs, sqrt
cimport cython
cimport openmp
from cython.parallel import prange, parallel
@cython.cdivision(True)
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef setSpeckleFlags (
                   dict DataStacks,
                   dict FlagValues,
                   dict SpiralSearchConfig,
                   dict Limits,
                   dict Margins,
                   float _NDV,
                   float _CORRECTION_OFFSET = 0,
                   ):
    
    '''
    Cython implementation of the "despeckle" algorithm; also applies land-sea mask and any systematic correction.
    
    Runs the "despeckle" algorithm, replacing cells that differ by more than 
    N1 standard deviations from the local mean with nodata, and replacing cells that differ 
    by more than N2 standard deviations (where N1 > N2) from the local mean with nodata iif 
    their z nearest neighbours don't on average also do so.
    '''
   
    # data, means, stds should have margins allowing the neighbour search to run
    # flags and landMask should be of the shape that is data shape - margins
    # data and flags will be modified in place ready for passing to A1 routine
    cdef:
        # 2d arrays for the upper and lower validity / possible speckle limits
        float [:,::1] validLower, validUpper, dodgyLower, dodgyUpper
        # 2d array of the zscores for the valid cells in the current day
        float[:,::1] zScoresDay
        int[:,::1] nbrIntCoords
        # loop vars etc
        Py_ssize_t zShape, xShapeTotal, yShapeTotal
        Py_ssize_t xShapeToDespeckle, yShapeToDespeckle
        # mnemonic, yT = total (outer coords), yD is shifted into the output depeckled coords
        Py_ssize_t yT, xT_prv, xD_prv, yD_prv, xDNbr_prv, yDNbr_prv
        float value_prv
        long long speckleCount_Glob, extremeCount_Glob, goodCount_Glob, oceanCount_Glob, clearedSpeckleCount_Glob
        Py_ssize_t nbrIndex_prv
        float nbrZscore_prv, zscoreDiff_prv
        int nbrZscoreCount_prv
        float nbrZscoreTotal_prv
       
        float _BLANK_ZSCORE
    
    # unpack input dictionaries to typed variables
    cdef:
        # thresholds / consts
        float hardLowerLimit = Limits["HARD_LOWER_LIMIT"]
        float hardUpperLimit = Limits["HARD_UPPER_LIMIT"]
        float _SPECKLE_ZSCORE_THRESHOLD = Limits["ZSCORE_ACCEPTANCE_STDS"]
        float stDevValidityThreshold = Limits["INVALID_BEYOND_STDS"]
        float speckleDevThreshold = Limits["SPECKLE_BEYOND_STDS"]
  
        int _SPECKLE_NBR_MIN_THRESHOLD = SpiralSearchConfig["MIN_NBRS_REQUIRED"]
        int _SPECKLE_NBR_MAX_THRESHOLD = SpiralSearchConfig["MAX_NBRS_REQUIRED"]
        int _MAX_NEIGHBOURS_TO_CHECK = SpiralSearchConfig["MAX_NBRS_TO_SEARCH"]
        
        short _OCEAN_FLAG = FlagValues["OCEAN"]
        short _NEVERDATA_FLAG = FlagValues["FAILURE"]
        short _EXTREME_FLAG = FlagValues["EXTREME"]
        short _SPECKLE_FLAG = FlagValues["SPECKLE"]
        
        int marginT = Margins["TOP"]
        int marginB = Margins["BOTTOM"]
        int marginL = Margins["LEFT"] 
        int marginR = Margins["RIGHT"]
        
        float[:,:,::1] data = DataStacks["Data"]
        unsigned char[:,:,::1] flags = DataStacks["Flags"]
        float[:,::1] means = DataStacks["Means"]
        float[:,::1] stds = DataStacks["Stds"]
        char[:,::1] landMask = DataStacks["LandMask"]
        
        float[:,:,::1] outputData
        
    zShape = data.shape[0]
    yShapeTotal = data.shape[1]
    xShapeTotal = data.shape[2]
    # we will only iterate thru cells that are not in the margins
    yShapeToDespeckle = data.shape[1] - (marginT + marginB)
    xShapeToDespeckle = data.shape[2] - (marginL + marginR)
    # and we will only set flags within the cells that A1 cares about
    #yShapeActual = data.shape[1] - (marginTTot + marginBTot)
    #xShapeActual = data.shape[2] - (marginLTot + marginRTot)
    
    assert means.shape[0] == stds.shape[0] == yShapeTotal
    assert means.shape[1] == stds.shape[1] == xShapeTotal
    assert landMask.shape[0] == yShapeTotal
    assert landMask.shape[1] == xShapeTotal
 
    assert flags.shape[1] == yShapeTotal
    assert flags.shape[2] == xShapeTotal
    
    validLower = np.empty(shape=(yShapeTotal, xShapeTotal), dtype='float32', order='c')
    validUpper = np.empty(shape=(yShapeTotal, xShapeTotal), dtype='float32', order='c')
    dodgyLower = np.empty(shape=(yShapeTotal, xShapeTotal), dtype='float32', order='c')
    dodgyUpper = np.empty(shape=(yShapeTotal, xShapeTotal), dtype='float32', order='c')
    zScoresDay = np.empty(shape=(yShapeTotal, xShapeTotal), dtype='float32', order='c')
    
    _BLANK_ZSCORE = -999.0
    speckleCount_Glob = 0
    extremeCount_Glob = 0
    goodCount_Glob = 0
    oceanCount_Glob = 0
    clearedSpeckleCount_Glob = 0
    
    print ("Despeckle: Rejecting data beyond {0!s}s.d. of mean. Nbr search on data beyond {1!s} s.d. of mean.".
           format(stDevValidityThreshold, speckleDevThreshold))
    print ("Nbr searching for {0!s} - {1!s} nbrs within {2!s} spiral steps for z-score tolerance of {3!s}".
           format( _SPECKLE_NBR_MIN_THRESHOLD, _SPECKLE_NBR_MAX_THRESHOLD, _MAX_NEIGHBOURS_TO_CHECK, 
                  _SPECKLE_ZSCORE_THRESHOLD))
    
    # Generate the neighbour spiral search table out to "a bit" further than needed
    _SEARCH_RADIUS =  <int> ((sqrt(_MAX_NEIGHBOURS_TO_CHECK / 3.14)) + 5)
    diam = _SEARCH_RADIUS * 2 + 1
    print ("Despeckle diam = " + str(diam))
    inds = np.indices([diam, diam]) - _SEARCH_RADIUS
    distTmp = np.sqrt((inds ** 2).sum(0))
    npTmpTable = ((inds.T).reshape(diam ** 2, 2))
    npTmpTable = np.append(npTmpTable, distTmp.ravel()[:, None],axis=1)
    
    # sort the table by distance then x then y (the arguments are last-sort-first)
    order = np.lexsort((npTmpTable[:, 1],npTmpTable[:, 0],npTmpTable[:, 2]))
    npTmpTable = np.take(npTmpTable, order, axis=0)
    
    # transfer to a C-side object transposed to have three rows and many columns and in 
    # C-contiguous layout, so that cython can access individual nbr coord sets more quickly
    nbrTable = np.copy((npTmpTable[npTmpTable[:,2] <= _SEARCH_RADIUS]).T, order='c')
    # cast the columns that will be used as array indices to int type once here, rather 
    # than casting repeatedly inside the inner loop
    nbrIntCoords = np.asarray(nbrTable[0:2,:]).astype(np.int32)
    
    # We can't modify the input in this algorithm as we have parallelised it
    # and we look at adjacent results in setting the cell's value - we only want to use
    # original data for this check, so we have to keep a pristine copy
    outputData = np.empty_like(data, order='c')
    outputData[:] = _NDV
    
    # We need to run speckle check IN the margins so the data in those margins can be eliminated if necessary
    # and not used in checking other values not in the margins
    # However the speckle search radius is smaller than the A1 radius
    # So we give speckle check a smaller margin so it can be happy but everything that A1 needs 
    # has been despeckled
    
    # Apply any correction offset (to correct messed-up celsius-kelvin conversion!!)
    if _CORRECTION_OFFSET != 0:
        for z in range (zShape):
            with nogil, parallel(num_threads=20):
                for yT in prange (yShapeTotal, schedule='static', chunksize=200):
                    xT_prv = -1
                    for xT_prv in range (xShapeTotal):
                        if data[z, yT, xT_prv] != _NDV:
                            data[z, yT, xT_prv] = data[z, yT, xT_prv] + _CORRECTION_OFFSET
                
    # Precalculate the thresholds. These are the same for every day in the stack so spend 
    # some memory to avoid doing it 15 times (this is the only economy in calling this routine
    # with a 3d array, so if memory is an issue just modify to run on a 2d array one day at a time)
    # run for total shape i.e. including speckle margins
    with nogil, parallel(num_threads=20):
        for yT in prange (yShapeTotal, schedule='static', chunksize=200):
            xT_prv = -1
            for xT_prv in range (xShapeTotal):
                if landMask[yT,xT_prv] == 0:
                    continue
                if means[yT, xT_prv] == _NDV:
                    continue
                if _CORRECTION_OFFSET != 0:
                    means[yT, xT_prv] = means[yT, xT_prv] + _CORRECTION_OFFSET
                validLower[yT,xT_prv] = means[yT,xT_prv] - stds[yT,xT_prv] * stDevValidityThreshold
                validUpper[yT,xT_prv] = means[yT,xT_prv] + stds[yT,xT_prv] * stDevValidityThreshold
                dodgyLower[yT,xT_prv] = means[yT,xT_prv] - stds[yT,xT_prv] * speckleDevThreshold
                dodgyUpper[yT,xT_prv] = means[yT,xT_prv] + stds[yT,xT_prv] * speckleDevThreshold

    # now identify cells which _may_ be speckles by comparison to the limits just calculated
    # also apply land-sea mask and calculate z scores in this pass
    # Run on full dataset incl margins
    for z in range (zShape): 
        #re initialise zscoresday
        zScoresDay[:] = _BLANK_ZSCORE
        with nogil,parallel(num_threads=10):
            # do for everything, including speckle margins
            for yT in prange (yShapeTotal, schedule='static', chunksize=200):
                # assign to the inner loop's variables so that Cython will make them thread-private
                xT_prv = -1
                value_prv = _NDV
                nbrIndex_prv = -1
                nbrZscoreCount_prv = 0
                nbrZscoreTotal_prv = 0.0
                nbrZscore_prv = 0.0
                zscoreDiff_prv = 0.0
                
                for xT_prv in range(xShapeTotal):
                    if landMask[yT, xT_prv] == 0:
                        # do not do anything where it is not in the land
                        # just set the flags image to record this
                        flags[z, yT, xT_prv] = flags[z, yT, xT_prv] | _OCEAN_FLAG
                        
                        # Do not copy across any data existing in the sea. It's likely to be squiffy
                        # (EVI in particular) and if we leave it, the fill process inland could use these 
                        # dodgy values
                        # (Output data is already NDV from initialisation)
                        
                        # Tracking
                        oceanCount_Glob += 1
                        continue
                        
                    # if no mean then it means we never have data in the stack and will not be able to 
                    # produce a fill
                    if means[yT, xT_prv] == _NDV:
                        flags[z, yT, xT_prv] = flags[z, yT, xT_prv] | _NEVERDATA_FLAG
                        continue
                        
                    value_prv = data[z, yT, xT_prv]
                    # is the value outside extreme thresholds? delete if so
                    if (value_prv < hardLowerLimit or value_prv > hardUpperLimit or 
                        value_prv < validLower[yT, xT_prv] or value_prv > validUpper[yT, xT_prv] 
                        or value_prv == _NDV):
                        if value_prv != _NDV:
                            flags[z, yT, xT_prv] = flags[z, yT, xT_prv] | _EXTREME_FLAG
                            extremeCount_Glob += 1
                        # no flag to set if it IS nodata because the fact it is nodata is all the flag
                        # we need...
                        continue
                        
                    # implicit else
                    # pre-calculate the z-scores for all cells that are not definitely invalid
                    zScoresDay[yT, xT_prv] = (value_prv - means[yT, xT_prv]) / stds[yT, xT_prv]
                    outputData [z, yT, xT_prv] = value_prv
                    # see if the location is a possible speckle
                    if value_prv >= dodgyLower[yT, xT_prv] and value_prv <= dodgyUpper[yT, xT_prv]:
                        # it's within the local limits, so it counts as good data, so pass through
                        goodCount_Glob += 1
                    else:
                        # it's outside the speckle thresholds, but within the validity thresholds
                        # so set the speckle flag (we may remove it again later after the nbr check)
                        flags[z, yT, xT_prv] = flags[z, yT, xT_prv] | _SPECKLE_FLAG 
        
        # now go through again, the z-scores are all populated wherever possible so no need 
        # to re-calculate every time a nbr is checked as in orginal implementation
        # do not run for speckle margins - use inner shape - but this will still include the A1 margins
        # if everything's been set up right!
        with nogil,parallel(num_threads=10):
            for yT in prange (yShapeToDespeckle, schedule='dynamic', chunksize=200):
                
                xT_prv = -1
                xD_prv = -1
                yD_prv = yT + marginT
                xDNbr_prv = -1
                yDNbr_prv = -1
                
                nbrIndex_prv = -1
                nbrZscoreCount_prv = 0
                nbrZscoreTotal_prv = 0.0
                nbrZscore_prv = 0.0
                zscoreDiff_prv = 0.0
                
                for xT_prv in range(xShapeToDespeckle):
                    xD_prv = xT_prv + marginL
                    nbrIndex_prv = -1
                    nbrZscoreCount_prv = 0
                    nbrZscoreTotal_prv = 0.0
                    nbrZscore_prv = 0.0
                    zscoreDiff_prv = 0.0
                    if (flags[z, yD_prv, xD_prv] & _SPECKLE_FLAG) != _SPECKLE_FLAG:
                        continue
                    
                    # else we are on a possible speckle
                    nbrZscoreCount_prv = 0
                    nbrZscoreTotal_prv = 0.0
                    # spiral search of the nbrs to see if they too are far from 
                    # the mean in which case this isn't a speckle after all
                    for nbrIndex_prv in range(1, _MAX_NEIGHBOURS_TO_CHECK + 1):
                        # This loop may run 100s or 1000s of times for every potential
                        # speckle cell, i.e. potentially trillions of times.
                        # So it's really important to get this loop fast.
                        if nbrZscoreCount_prv == _SPECKLE_NBR_MAX_THRESHOLD:
                            # we've found sufficient neighbours so stop looking
                            break
                        # use int-type coords array to avoid cast op in tight loop
                        xDNbr_prv = xD_prv + nbrIntCoords[0, nbrIndex_prv]
                        yDNbr_prv = yD_prv + nbrIntCoords[1, nbrIndex_prv]

                        # is the requested neighbour cell within data bounds and with data?
                        if (xDNbr_prv >= 0 and xDNbr_prv < xShapeTotal and 
                                yDNbr_prv >= 0 and yDNbr_prv < yShapeTotal and
                                zScoresDay[yDNbr_prv, xDNbr_prv] != _BLANK_ZSCORE):
                            nbrZscoreCount_prv = nbrZscoreCount_prv + 1 # tracking
                            nbrZscoreTotal_prv = nbrZscoreTotal_prv + zScoresDay[yDNbr_prv, xDNbr_prv]
                    
                    # did we find enough neighbours? 
                    if nbrZscoreCount_prv >= _SPECKLE_NBR_MIN_THRESHOLD:
                        # were the neighbours similar to this cell in terms of distance from mean?
                        nbrZscore_prv = nbrZscoreTotal_prv / nbrZscoreCount_prv
                        zscoreDiff_prv = fabs(nbrZscore_prv - zScoresDay[yD_prv, xD_prv])
                        if zscoreDiff_prv < _SPECKLE_ZSCORE_THRESHOLD:
                            # this pixels neighbours are also far from the mean, it's probably
                            # a volcano or a country that caught fire or something, so we'll believe it,
                            # clear the flag, and carry through the original value
                            flags[z, yD_prv, xD_prv] = flags[z, yD_prv, xD_prv] ^ _SPECKLE_FLAG
                            clearedSpeckleCount_Glob += 1
                            continue
                    # insufficient neighbours or the neighbours are less extreme.
                    # so this is a speckle; delete the data
                    speckleCount_Glob += 1
                    outputData[z, yD_prv, xD_prv] = _NDV
                    
    print ("Speckle count:    " + str(speckleCount_Glob))
    print ("Extreme count:    " + str(extremeCount_Glob))
    print ("Good count:       " + str(goodCount_Glob))
    print ("Cleared Speckle:  " + str(clearedSpeckleCount_Glob) )
    print ("Ocean count:      " + str(oceanCount_Glob))
    return (outputData, flags)

## A1 Algorithm

In [None]:
%%cython --compile-args=/openmp --link-args=/openmp --force
#%%cython  --compile-args=-fopenmp --link-args=-fopenmp --force
cimport cython
cimport numpy as np
from cython.parallel import prange, parallel
import numpy as np
from libc.stdlib cimport abs
from libc.math cimport sqrt
cdef int _TRIM_MIN_MAX = 1

@cython.boundscheck(False)
@cython.wraparound(False)
cdef int[::1] alternates_cy(int lim):
    ''' yields 1, -1, 2, -2, etc up to lim '''
    cdef int x
    x = 1
    cdef int[::1] arr = np.empty(shape=(lim*2),dtype=np.int32,order='c')
    for x in range(0,lim): 
        arr[2*x] = x+1
        arr[2*x+1] = -(x+1)
        x += 1
    return arr

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
# pass in the whole data that should be checked, and optionally the margins that are provided
# - we will only fill gaps within the portion of the data that isn't excluded in the margin,
# but we will use the margin data to provide neighbour data for those pixels
# So for a tiled run data can (should) be provided with a padding of _SEARCH_RADIUS and the 
# margin set to this value also. That way the algorithm can get values for "edge pixels" of its
# input data

cpdef fillGapsA1_Cy(dict DataStacks, # has items Data, Flags, DistTemplate (optional), KnownUnfillable (optional)
                    dict FlagValues,
                    dict SpiralSearchConfig,
                    dict Margins, # has items Top, Bottom, Left, Right
                    float _NDV, 
                    char FillByRatios = 0,
                    float RatioAbsZeroPoint = 0,
                    float RatioLimit = 1, 
                    char RunFillFromPos = 0
                ):
    
    """
    Optimised, multithreaded Cython (C) implementation of the A1 gapfilling algorithm.
    
    fillGapsA1_Cy(dict DataStacks, dict FlagValues, dict SpiralSearchConfig, dict Margins, 
                float _NDV, char FillByRatios=0, float RatioAbsZeroPoint=0, float RatioLimit=1,
                char RunFillFromPos = 0) 
                    -> 
                    (float[:,:,::1] output, float[:,:,::1] distances, dict info)
    
    Runs A1 on a stack of data of a given calendar day; the stack should have one layer (0th dimension) 
    for each year. The algorithm can be run in a tiled or sliced fashion by passing in a stack 
    that isn't the full extent. In this case the stack passed should have "surplus" data at the edges 
    so that the neighbourhood search can proceed. The size of these edges should be specified with the 
    margin parameters. These should be at least as big as the neighbourhood search radius which is set 
    to 3142 cells area = ~31km radius. 
    The flags should be the same shape as the data EXCLUDING the margins. 
    Returns a tuple containing the filled data and distances and fill statistics for logging.
    """
    cdef:
        # define all arrays (cython memoryview objects) as being c-contiguous 
        # so cython doesn't treat as strided (slower access)
        # this gives major speedup in access in tight loops as the cpu caching can 
        # work more effectively
        
        # intermediate arrays
        double [:,::1] nbrTable
        int[:,::1] nbrIntCoords
        char [:,::1] neverDataLocs
        char  noTemplate
        # OUTPUTS
        float [:,:,::1] outputData
        float [:,:,::1] outputDists
        unsigned char [:,:,::1] outputFlags
        # in-loop working vars
        Py_ssize_t z, y 
        Py_ssize_t zShape, yShape, xShape
        Py_ssize_t yShapeTotal, xShapeTotal
        int [::1] deltas
        double posInf,negInf
        
        #thread-private ones:
        Py_ssize_t x_prv
        Py_ssize_t newZ_prv, xi_prv, yi_prv, xNbr_prv, yNbr_prv
        int spiralStart_prv
        Py_ssize_t max_idx_prv, min_idx_prv
        double ws_prv, sw_prv,  pfv_prv, weight_prv, wfv_prv
        double altValue_prv, currentValue_prv
        int nfound_prv
        int delta_prv, deltaidx_prv, nbrIndex_prv
        
        # intermediate values for ratio-based calculations
        # (now just using the same vars whichever approach is used)
        #double max_Ratio_prv, maxR_Dist_prv, maxR_wpfv_prv, maxR_weight_prv 
        #double min_Ratio_prv, minR_Dist_prv, minR_wpfv_prv,  minR_weight_prv
        #double ratio_prv
        # intermediate values for offset (addition) based calculations
        double max_Diff_prv, maxD_Dist_prv, maxD_wpfv_prv, maxD_weight_prv 
        double min_Diff_prv, minD_Dist_prv, minD_wpfv_prv, minD_weight_prv
        double valueDiff_prv
        
        double sumDist_prv
        unsigned char flag_prv
        
        # First loop stuff
        Py_ssize_t xi, yi, x
        int nfound
        
        # Unpack the parameter dictionaries to typed variables 
        # (the dictionaries were just to clean up the signature a bit in the absence of motivation
        # to make some specific type to pass the data more cleanly) 
        char _OCEAN_FLAG = FlagValues["OCEAN"]
        char _FAILURE_FLAG = FlagValues["FAILURE"]
        char _SUCCESS_FLAG = FlagValues["A1_FILLED"]
        char _SUCCESS_WAS_FULL_FLAG = FlagValues["A1_FULL"]
        int marginT = Margins["TOP"]
        int marginB = Margins["BOTTOM"]
        int marginL = Margins["LEFT"] 
        int marginR = Margins["RIGHT"]
        
        float [:,:,::1] dayDataStack = DataStacks["Data"]
        unsigned char[:,:,::1] inputFlags = DataStacks["Flags"] # embeds the land-sea mask (sea=1 land =0)
        unsigned char[:,:,::1] dataDistTemplate=None
        unsigned char[:,::1] knownUnfillableLocs=None
        float _AbsZeroPoint = RatioAbsZeroPoint
        float _MaxAllowableRatio = RatioLimit
        float _MinAllowableRatio = 1.0 / _MaxAllowableRatio
        
        #  how many locations should be checked in spiral search (gives radius). #3142 
        int _MAX_NEIGHBOURS_TO_CHECK = SpiralSearchConfig["MAX_NBRS_TO_SEARCH"] 
        
        # Only use the values gleaned from up to this number of cells (Even if more are avail within radius) #640
        int _FILL_THRESHOLD = SpiralSearchConfig["MAX_NBRS_REQUIRED"] 
        
        # 320 min number of values that must be found to have a valid fill
        int _FILL_MIN = SpiralSearchConfig["MIN_NBRS_REQUIRED"]
        
        # calc the distance that is implied by the max spiral search length
        int _SEARCH_RADIUS = <int> (sqrt((_MAX_NEIGHBOURS_TO_CHECK*2.0) / 3.14))  + 1 
  
        # calculation / tracking vars: will be reduction variables - incremented but not read by threads
        # for the whole globe there are 933,120,000 pixels per image so this exceeds 
        # max val of uint after 4 images!
        long long totalProcessedGapCells,  totalCells, oceanCells, neverDataCells
        long long scannedLevels, scannedNeighbours, usedNeighbours 
        long long filledBelowThreshold, noPairsFound, insufficientPairsFound, filledToThreshold 
        long long gapsAtUnfillableLocs, gapsTooBig, dataGood
                  
    # can precalc dist from gap to nearest data for more  efficient spiral search 
    # (no need to start spiral search closer than the known nearest data pixel)
    if DataStacks["DistTemplate"]:
        dataDistTemplate = DataStacks["DistTemplate"]
    # can precalc locations where no alternate years exist (so no fill will be possible)
    if DataStacks["KnownUnfillable"]:
        knownUnfillableLocs = DataStacks["KnownUnfillable"]
           
    zShape = dayDataStack.shape[0]
    # size of the input data including search margins
    yShapeTotal = dayDataStack.shape[1]
    xShapeTotal = dayDataStack.shape[2]
    # size of the data that needs to be filled.
    # we will only iterate thru cells that are not in the margins
    # (they are equal (zero margin) if filling a global image in one go, and the margin is also zero
    # for the slice edges at the edge of the global images)
    yShape = dayDataStack.shape[1] - (marginT + marginB)
    xShape = dayDataStack.shape[2] - (marginL + marginR)
    
    # cython doesn't have inf defined 
    posInf = np.inf
    negInf = -np.inf
      
    # generate nbr distance table (offset coordinates for the spiral search steps) using numpy
    diam = _SEARCH_RADIUS * 2 + 1
    inds = np.indices([diam,diam]) - _SEARCH_RADIUS
    distTmp = np.sqrt((inds ** 2).sum(0))
    npTmpTable = ((inds.T).reshape(diam**2, 2))
    npTmpTable = np.append(npTmpTable, distTmp.ravel()[:,None],axis=1)
    # sort the table by distance then x then y (the arguments are last-sort-first)
    order = np.lexsort((npTmpTable[:,1], npTmpTable[:,0], npTmpTable[:,2]))
    npTmpTable = np.take(npTmpTable, order, axis=0)
    
    # the C-side result of the distance table calculations
    # transpose it to have three rows and many columns and take a C contiguous copy of this
    # so that cython can access individual nbr coord sets more quickly
    nbrTable = np.copy((npTmpTable[npTmpTable[:,2] <= _SEARCH_RADIUS]).T,order='c')
    
    # the distance table is stored with a float type (for the distances) but we need ints 
    # for indexing based on coords. we can cast at the time we get them out, but as this happens 
    # in the innermost loop it is done trillions of times and so the time penalty of that 
    # becomes signficant. So, store an int version of the coords array
    nbrIntCoords = np.asarray(nbrTable[0:2,:]).astype(np.int32)
    
    # initialise C arrays each from an appropriate empty numpy object
    # these will have the same size as the area we're actually filling i.e. not the (buffered)
    # input section
    outputData = np.empty(shape=(zShape, yShape, xShape), dtype='float32', order='c')
    outputDists = np.empty(shape=(zShape, yShape, xShape), dtype='float32', order='c')
    outputFlags = np.zeros(shape=(zShape, yShape, xShape), dtype='uint8', order='c')
    outputData[:] = _NDV
    outputDists[:] = _NDV
    
    # handle optional parameters
    if dataDistTemplate is None:
        noTemplate = 1 
    else:
        noTemplate = 0
        assert (dataDistTemplate.shape[0] == zShape and
                dataDistTemplate.shape[1] == yShape and 
                dataDistTemplate.shape[2] == xShape
                )
    
    # diagnostics; TODO use logging
    print ("Running A1 (Full Spiral Search).")
    print ("No data template: {0!s}. Using ratio method: {1!s}. Searching for {2!s} - {3!s} nbrs within {4!s} spiral steps".
           format(noTemplate, FillByRatios, _FILL_MIN, _FILL_THRESHOLD, _MAX_NEIGHBOURS_TO_CHECK))
    print ("Calculating nbr table out to radius of {0!s}.".format(_SEARCH_RADIUS))
    print ("Filling from stack position {0!s}.".format(RunFillFromPos))
                                                                      
    assert inputFlags.shape[1] == yShapeTotal
    assert inputFlags.shape[2] == xShapeTotal
    
    if knownUnfillableLocs is None: # i.e. has not been precalced with np.all
        # knownUnfillableLocs will a 2D map of cells where there is no data in any year (z axis)
        knownUnfillableLocs = np.ones(shape=(yShape, xShape), dtype=np.uint8, order='c')
        # locate cells that CAN be filled (negating it like this means we can keep 
        # the loop order cache-friendly - as opposed to iterating through Z at each 
        # y,x location until/unless we find a value > 0. 
        # This process is still slower on 1 thread than numpy.all, but not significant overall,
        # and multithreading makes it way faster to do here
        with nogil:
            for z in range(zShape):
                # work is trivial and fairly well balanced so use static schedule
                for y in prange(yShape, schedule='static', num_threads=20): 
                    x_prv = -1
                    yi_prv = y + marginT
                    for x_prv in range(xShape):
                        xi_prv = x_prv + marginL
                        if dayDataStack[z, yi_prv, xi_prv] != _NDV:
                            knownUnfillableLocs[y, x_prv] = 0
    else:
        assert (knownUnfillableLocs.shape[0] == yShape and
                knownUnfillableLocs.shape[1] == xShape
                )
    
    if FillByRatios:
        # we need to work with ratios between cells but will be running this on non-ratio scale variables
        # (i.e. where 0 is an arbitrary point) such as temp in celsius. This could lead to div/0 giving infinite
        # ratios, and it's not valid anyway (e.g. 20/10 != 10/-10 even though the interval is the same).
        # The latter point doesn't matter _much_ because we only use the ratio to multiply back against an alternate
        # value which will be similar. Use a large "absolute zero" relative to the values and it matters even less
        # (but not too large, to avoid FP errors)
        with nogil:
            for z in range(zShape):
                for y in prange(yShapeTotal, schedule='static', num_threads=20):
                    x_prv = -1
                    for x_prv in range(xShapeTotal):
                        if dayDataStack[z, y, x_prv] != _NDV:
                            dayDataStack[z, y, x_prv] = dayDataStack[z, y, x_prv] -  _AbsZeroPoint
    else:
        _AbsZeroPoint = 0 # to avoid need for further check in loop below
        
    # initialise metrics
    totalProcessedGapCells = 0 # for testing
    totalCells = 0 # for testing
    scannedLevels = 0 # for testing
    scannedNeighbours = 0 # for testing
    filledToThreshold = 0 
    filledBelowThreshold=0
    noPairsFound = 0
    insufficientPairsFound = 0
    
    gapsAtUnfillableLocs = 0
    gapsTooBig=0
    usedNeighbours = 0
    gapsInKnownUnfillable = 0
    dataGood = 0
    oceanCells = 0
    neverDataCells = 0
    
    # a single call to produce the alternating forward/back year offsets
    deltas = alternates_cy(zShape)
    
    # now do A1 gap fill! Iterate through the entire data stack 
    # The arrays are C-ordered so it's fastest in terms of CPU cache to have 
    # X on the innermost loop. Parallelise over the y axis.
    
    # Telling cython how to parallelise the variables is done implicitly by how you access them.
    # - assign to a variable if you want it to be thread-private i.e. x = x + 1
    # - increase it in-place if you want it to be shared i.e. x += 1
    # The latter case turns it into a "reduction" variable which cannot be read in the parallel 
    # loop. 
    # Therefore all variables that need to be thread private must be made so by artifially assigning 
    # (something) to them within the parallel block. 
    # Check the generated C code to be sure it's worked as intended!
    with nogil, parallel(num_threads=10): # change num cores here
        for z in range(zShape):
            if z >= RunFillFromPos:
                # experiments with chunksizes 500, 50, 20, 2, and parallelising on z axis instead,
                # showed 20 to be the quickest (tradeoff between allocation overhead vs one thread doing 
                # a "gappy" area all by itself). It's likely to vary by machine and num threads due to 
                # CPU cache differences i.e. the spiral search needs a lot of cache to be efficient
                # as it crosses rows so the access cannot be completely cache-economical. 
                # However 6 threads was still slower than 12 on a 6-physical 12-virtual machine
                for y in prange(yShape, schedule='dynamic', chunksize=20):
                    # assign something to all variables that are used in filling one cell and need 
                    # to be private (i.e. they get modified but we don't want the iteration that fills 
                    # another cell to know about that).
                    # It's verbose but this tricks cython into making them thread-private
                    nfound_prv = 0
                    ws_prv = 0
                    sw_prv = 0
                    weight_prv=0
                    pfv_prv=0
                    #ratio_prv=0
                    valueDiff_prv = 0
                    delta_prv=0
                    deltaidx_prv = -1
                    currentValue_prv=0
                    altValue_prv=0
                    xNbr_prv=-1
                    yNbr_prv=-1
                    xi_prv=-1
                    yi_prv=-1
                    x_prv=-1
                    newZ_prv=-1
                    nbrIndex_prv=-1
                    sumDist_prv=0

                    max_Diff_prv = negInf
                    maxD_Dist_prv = 0
                    maxD_weight_prv=0
                    maxD_wpfv_prv=0
                    
                    min_Diff_prv = posInf
                    minD_Dist_prv = 0
                    minD_wpfv_prv = 0
                    minD_weight_prv = 0
                    
                    max_idx_prv = 0
                    min_idx_prv = 0
                    flag_prv = -1
                    
                    yi_prv = y + marginT
                    
                    for x_prv in range(xShape):
                        xi_prv = x_prv + marginL
                        #re initialise variables for this cell
                        nfound_prv= 0
                        ws_prv = 0.0
                        sw_prv = 0
                        weight_prv = 0
                        pfv_prv = 0
                        #ratio_prv = 0
                        valueDiff_prv = 0
                        delta_prv = 0
                        deltaidx_prv = -1
                        currentValue_prv = 0
                        altValue_prv = 0
                        xNbr_prv = -1
                        yNbr_prv = -1
                        newZ_prv = -1
                        nbrIndex_prv = -1
                        sumDist_prv = 0

                        max_Diff_prv = negInf
                        maxD_Dist_prv = 0
                        maxD_weight_prv = 0
                        maxD_wpfv_prv = 0

                        min_Diff_prv = posInf
                        minD_Dist_prv = 0
                        minD_wpfv_prv = 0
                        minD_weight_prv = 0

                        max_idx_prv = 0
                        min_idx_prv = 0
                        flag_prv = -1
                    
                        totalCells += 1 # testing log
                        
                        if ((inputFlags[z, yi_prv, xi_prv] & _OCEAN_FLAG) == _OCEAN_FLAG):
                            oceanCells += 1
                            outputFlags[z, y, x_prv] = inputFlags[z, yi_prv, xi_prv]
                            #in the ocean do not copy across (even if there is data; MODIS may not match shore cleanly)
                            #output is already nodata and flag is alraady set so just 
                            continue
                        
                        if ((inputFlags[z, yi_prv, xi_prv] & _FAILURE_FLAG) == _FAILURE_FLAG):
                            # also do this if failure flag (2) is set (by despeckle algorithm indicating that mean 
                            # was ND thus never any data on any calendar day)
                            outputFlags[z, y, x_prv] = inputFlags[z, yi_prv, xi_prv]
                            neverDataCells +=1
                            continue

                        currentValue_prv = dayDataStack[z, yi_prv, xi_prv]

                        #if value is valid just copy it across
                        if not currentValue_prv == _NDV: # 0.0 is valid!!
                            # we have data, copy it across - output does not have margins
                            outputData[z, y, x_prv] = currentValue_prv + _AbsZeroPoint
                            outputFlags[z, y, x_prv] = inputFlags[z, yi_prv, xi_prv]
                            dataGood += 1
                            # flag is already non-ocean so just
                            continue

                        # if it's a location that has no data in any alternate year of this calendar day
                        # then we also cannot fill
                        if knownUnfillableLocs[y, x_prv] == 1:
                            totalProcessedGapCells += 1
                            gapsAtUnfillableLocs += 1
                            # for testing to check failure reason set this to 3?
                            outputFlags[z, y, x_prv] = outputFlags[z, y, x_prv] | _FAILURE_FLAG
                            # leave output at _NDV and just 
                            continue

                        # if we have a precalculated "large gaps" template does this show that 
                        # the cell will be unfillable due to lack of neighbours?
                        # The template gives locations that are more than _SEARCH_RADIUS
                        # away from any data point in the same year, i.e. where the spiral search will not
                        # be able to find anything. Precalculating this in scipy is generally somewhat faster 
                        # than leaving the code to iterate through everything here IF we only use 1 core
                        # NB sc evaluation makes this safe whether or not dataDistTemplate exists
                        if (noTemplate == 0 and dataDistTemplate[z, yi_prv, xi_prv] > _SEARCH_RADIUS + 1):
                            # TODO set a flag for this
                            outputFlags[z, y, x_prv] = outputFlags[z, y, x_prv] | _FAILURE_FLAG
                            totalProcessedGapCells += 1
                            gapsTooBig += 1
                            continue

                        # else attempt to fill the gap, using A1
                        if 1: # can't be bothered to rejig indents
                            # METRICS
                            totalProcessedGapCells += 1
                            flag_prv = inputFlags[z, yi_prv, xi_prv]
                            # deltas will be (1, -1, 2, -2, ...)
                            for deltaidx_prv in range(0, deltas.shape[0]):
                                delta_prv = deltas[deltaidx_prv]
                                newZ_prv = z + delta_prv
                                # does this zDelta fall off the start or end of the stack?
                                # or is the alternate year also blank here?
                                # (or, obviously, are we checking the current year!)
                                if (newZ_prv >= zShape or newZ_prv < 0 
                                    or newZ_prv == z
                                    or dayDataStack[newZ_prv, yi_prv, xi_prv] == _NDV):
                                    continue
                                # otherwise...
                                scannedLevels += 1 # tracking year-switches
                                
                                # note that the alternate value can be zero. on a ratio based fill 
                                # this would result in a fill value of zero, too, no matter what the nbr values are
                                altValue_prv = dayDataStack[newZ_prv, yi_prv, xi_prv]
                                
                                #step thru spiral-outward coords table
                                spiralStart_prv = 1 # zero is this cell
                                # if possible, check where to start the search based on known closest data
                                if noTemplate == 0:
                                    spiralStart_prv = <int>(((dataDistTemplate[z, yi_prv, xi_prv]-1)**2) * 3.141)
                                    if spiralStart_prv < 1:
                                        spiralStart_prv = 1
                                    elif spiralStart_prv > _MAX_NEIGHBOURS_TO_CHECK:
                                        spiralStart_prv = _MAX_NEIGHBOURS_TO_CHECK
                                
                                # spiral search for this alternate year
                                for nbrIndex_prv in range(spiralStart_prv, _MAX_NEIGHBOURS_TO_CHECK + 1):
                                    #scannedNeighbours += 1 # don't track as it's unnecessary op in inner loop

                                    # This is the inner loop that runs once for every neighbour of 
                                    # every gap cell, i.e. potentially many trillions of times.
                                    # So it's really important to get this loop fast.

                                    # Total time of this loop where the if below fails is now ~4.7nS
                                    # or around 12 CPU cycles. Considering there are potentially 4 array
                                    # accesses and several comparisons this is probably as good as we can 
                                    # get and it's clear that the L1D CPU cache is mostly being hit (as a 
                                    # single access to L2 cache is ~14 cycles)

                                    # moving int cast outside the loop saves 
                                    # ~ 2.7nS per loop i.e. >50% on an invalid iteration
                                    xNbr_prv = xi_prv + nbrIntCoords[0, nbrIndex_prv]
                                    yNbr_prv = yi_prv + nbrIntCoords[1, nbrIndex_prv]

                                    # is the requested neighbour cell within data bounds?
                                    # (allowing for the fact that the input data can have greater 
                                    # extent than the cells we are searching to allow tiled processing)
                                    # and does it have data in both relevant years?
                                    # Put current year check before alternate year check as current year is more
                                    # likely to not have data (given we are already at a gap) so it's more 
                                    # efficient to bail out there first
                                    if (xNbr_prv >= 0 and xNbr_prv < xShapeTotal and 
                                            yNbr_prv >= 0 and yNbr_prv < yShapeTotal and
                                            dayDataStack[z, yNbr_prv, xNbr_prv] != _NDV and 
                                            dayDataStack[newZ_prv, yNbr_prv, xNbr_prv] != _NDV):
                                        # we're good to go! do the maths for this contributing cell
                                        usedNeighbours +=  1 # tracking
                                        
                                        if FillByRatios == 0:
                                        #calculate difference not ratio
                                            valueDiff_prv = (dayDataStack[z, yNbr_prv, xNbr_prv] 
                                                             - dayDataStack[newZ_prv, yNbr_prv, xNbr_prv])
                                            pfv_prv = altValue_prv + valueDiff_prv
                                        else:
                                            # calculate ratio, taking some precautions to avoid stupid values 
                                            # when one is close to zero
                                            valueDiff_prv = (dayDataStack[z, yNbr_prv, xNbr_prv] 
                                                             / dayDataStack[newZ_prv, yNbr_prv, xNbr_prv])
                                            if (dayDataStack[newZ_prv, yNbr_prv, xNbr_prv] == 0 
                                                or valueDiff_prv > _MaxAllowableRatio):
                                                valueDiff_prv = _MaxAllowableRatio
                                            elif valueDiff_prv < _MinAllowableRatio:
                                                valueDiff_prv = _MinAllowableRatio
                                            pfv_prv = altValue_prv * valueDiff_prv + _AbsZeroPoint
                                        # implicit assumption that one year and one cell distance have the same weighting
                                        weight_prv = (1.0 / abs(delta_prv)) * (1.0 / nbrTable[2,nbrIndex_prv])
                                        ws_prv += pfv_prv * weight_prv
                                        sw_prv += weight_prv
                                        # the only thing the replacement values array got used for in original IDL was 
                                        # calculating the mean contibuting distance. we don't need to 
                                        # do all those assignments for that
                                        sumDist_prv = sumDist_prv + nbrTable[2,nbrIndex_prv]
                                        # track the things we need to trim min/max
                                        if valueDiff_prv < min_Diff_prv:
                                            min_Diff_prv = valueDiff_prv
                                            minD_Dist_prv = nbrTable[2,nbrIndex_prv]
                                            minD_wpfv_prv = pfv_prv*weight_prv
                                            minD_weight_prv = weight_prv
                                        if valueDiff_prv > max_Diff_prv:
                                            max_Diff_prv = valueDiff_prv
                                            maxD_Dist_prv = nbrTable[2,nbrIndex_prv]
                                            maxD_wpfv_prv = pfv_prv*weight_prv
                                            maxD_weight_prv = weight_prv
                                        nfound_prv = nfound_prv + 1
                                        #nfound = nfound + 1
                                        if nfound_prv == _FILL_THRESHOLD:
                                            break
                                if nfound_prv == _FILL_THRESHOLD:
                                    break
                            
                            if nfound_prv >= _FILL_MIN:
                                # we have found between min and threshold pairs
                                flag_prv = flag_prv | _SUCCESS_FLAG
                                if nfound_prv == _FILL_THRESHOLD:
                                    # we have found the upper limit of pairs (full fill)
                                    # (set both success flags)
                                    filledToThreshold += 1
                                    flag_prv = flag_prv | _SUCCESS_WAS_FULL_FLAG
                                else:
                                    filledBelowThreshold += 1
                                if _TRIM_MIN_MAX:
                                    ws_prv = ws_prv - (minD_wpfv_prv + maxD_wpfv_prv)
                                    sw_prv = sw_prv - (minD_weight_prv + maxD_weight_prv)
                                    sumDist_prv = sumDist_prv - (minD_Dist_prv + maxD_Dist_prv)
                                    nfound_prv = nfound_prv - 2
                                    #flag = 12 (old code recorded this per-pixel, don't see why)
                                wfv_prv = ws_prv / sw_prv
                                # record the result!!
                                outputData[z, y, x_prv] = wfv_prv
                                outputDists[z, y, x_prv] = sumDist_prv / nfound_prv
                            else:
                                flag_prv = flag_prv | _FAILURE_FLAG
                                # do not do anything to output: leave it at no-data
                                if nfound_prv > 0:
                                    insufficientPairsFound += 1
                                else:
                                    noPairsFound += 1
                            outputFlags [z, y, x_prv] = flag_prv
    print ("Total cells scanned:     "+str(totalCells))
    print ("Total cells with good data: "+str(dataGood))
    print ("Total cells ocean: "+str(oceanCells))
    print ("Total cells never-data: "+str(neverDataCells))
    print ("Total processed gaps:    "+str(totalProcessedGapCells))
    print ("Total gaps too large to fill: "+str(gapsTooBig))
    print ("Total perma-gaps:        "+str(gapsAtUnfillableLocs))
    print ("Total gaps filled fully: "+str(filledToThreshold))
    print ("Total filled partially:  "+str(filledBelowThreshold))
    print ("Total w/insufficient prs:"+str(insufficientPairsFound))
    print ("Total with no nbr pairs: "+str(noPairsFound))
    print ("Total yrs scanned f/b:   "+str(scannedLevels))
    print ("Total nbr cells scanned: "+str(scannedNeighbours))
    print ("Total used nbr pairs:    "+str(usedNeighbours))
    
    objRes = {
        "TotalCells":totalCells,
        "Ocean":oceanCells,
        "NeverData":neverDataCells,
        "GoodCells":dataGood,
        "TotalGaps":totalProcessedGapCells,
        "PermaGaps":gapsAtUnfillableLocs,
        "FilledFull":filledToThreshold,
        "FilledPartial":filledBelowThreshold,
        "FailInsufficientPairs":insufficientPairsFound,
        "FailNoPairs":noPairsFound,
        "TotalAlternateYrs":scannedLevels,
        "TotalNbrsChecked":scannedNeighbours,
        "TotalNbrsUsed":usedNeighbours
    }
    #return (output,flags)# and dists!
    return (outputData,outputDists,outputFlags,objRes)

## A2 core algorithm

In [None]:
%%cython --compile-args=/openmp --link-args=/openmp --force
#%%cython --compile-args=-fopenmp --link-args=-fopenmp --force
import numpy as np
cimport cython 
from libc.math cimport sqrt
#cdef int _SEARCH_RADIUS = 10
from cython.parallel import prange, parallel

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
cpdef fillGapsA2_Cy (
            dict DataImages,
            dict FlagValues,          
            float _NDV,
            Py_ssize_t _A2_MAX_NBRS,
            char FillByRatios = 0,
            float RatioAbsZeroPoint = 0,
            float RatioLimit = 1
            ):
    '''
    Cython (C) implementation of the A2 gapfilling algorithm main "pass" code. This function should be called 8 
    times with the data structured in such a way that the 8 different directional pass fills are generated, in 
    order to generate overall A2 fill results. A separate (python) function is provided for this.
    
    The nature of the A2 algorithm is such that it cannot be easily parallelised - it modifies the input based 
    on results from neighbouring cells, so the cells must be run in a deterministic order. 
    
    Likewise A2 "drags" fill value ratios / calculations for an unlimited distance from nearest data pixels. This 
    means that it cannot be run on separate tiles like A1 and must be run on global images.
    
    These two factors mean that unlike in the published paper, A2 is both slower and more demanding of memory than A1,
    whilst producing less good results. 
    
    However it is still required if we need to be assured that _all_ gaps will be filled because A1 only works out to 
    a specified radius (gap size). (In future we may try iterative / repeated running of A1 until all gaps are filled)
    
    This implementation is optimised as far as it can be in terms of Cython optimisations, except for the approach taken 
    to the 8 different directional passes. To reduce memory use, the caller function (elsewhere) does not not make a 
    C-ordered copy of the data but re-strides it. This greatly slows the A2 function (by a factor of around 6) and so 
    on a machine with sufficent memory the data should be copied into the right order before passing to this function.
    '''
    # We assume that the arrays passed in are strided / transposed 
    # such that iterating through in the standard c-normal order 
    # will go through the source data in the correct order for this 
    # directional pass. This means that some passes (the ones where the data 
    # are in the native order) are several times quicker than the others 
    cdef:
        # intermediate arrays
        double [:,::1] nbrTable
        int[:,::1] nbrIntCoords
        
        float[:,::1] diffImage_Local
        float [:, :] origDistImage_LocalCopy
        # in loop working vars
        Py_ssize_t y, x, yShape, xShape, nbrIndex, xNbr, yNbr
        double nbrDiffSum, nbrDiffCount, nbrDistSum, diffValThisPass
        double local_distance, nbrDist
        
        # metrics
        long long gotPixelVals = 0
        
        # unpack inputs to typed variables 
        float[:,:] dataImage_Global_R = DataImages["Data"] # global in both senses
        unsigned char[:,:] flagsImage_Global_R = DataImages["Flags"]
        float[:,:] origDistImage_Global_R = DataImages["Distances"]
        float[:,:] meanImage_Global_R = DataImages["Means"]
        # these inputs get modified, i.e. they are "out" parameters in a proper language
        # It is done like this rather than having return values because they are actually going to be
        # strided views on arrays (to change iteration order)
        float[:,:] sumDistImage_Global_W = DataImages["SumDist"] 
        float[:,:] outputImage_ThisPass_W = DataImages["Output"]
        char _FILL_FAILED_FLAG = FlagValues["FAILURE"]
        char _OCEAN_FLAG = FlagValues["OCEAN"]
        
        float _AbsZeroPoint = RatioAbsZeroPoint
        float _MaxAllowableRatio = RatioLimit
        float _MinAllowableRatio = 1.0 / _MaxAllowableRatio
        
    yShape = dataImage_Global_R.shape[0]
    xShape = dataImage_Global_R.shape[1]
            
    # it's actually the max neigbours value that defines how far out the search runs. 
    # this just makes sure that the nbr table is generated far enough out.
    _SEARCH_RADIUS = <int>sqrt(_A2_MAX_NBRS / 3.14) + 10
    diam = _SEARCH_RADIUS * 2 + 1
    inds = np.indices([diam,diam]) - _SEARCH_RADIUS
    distTmp = np.sqrt((inds ** 2).sum(0))
    npTmpTable = ((inds.T).reshape(diam**2, 2))
    npTmpTable = np.append(npTmpTable, distTmp.ravel()[:,None],axis=1)

    # sort the table by distance then x then y (the arguments are last-sort-first)
    order = np.lexsort((npTmpTable[:,1],npTmpTable[:,0],npTmpTable[:,2]))
    npTmpTable = np.take(npTmpTable,order,axis=0)

    # the C-side result of the distance table calculations
    # transpose it to have three rows and many columns and take a C contiguous copy of this
    # so that access to individual nbr coord sets is optimised
    nbrTable = np.copy((npTmpTable[npTmpTable[:,2] <= _SEARCH_RADIUS]).T,order='c')
    # the distance table is stored with a float type but we need ints for indexing
    # based on its coords. we can cast at the time we get them out, but as this happens 
    # in the innermost loop it is done trillions of times and so the time penalty of that 
    # becomes signficant. So, store an int version of the coords array
    nbrIntCoords = np.asarray(nbrTable[0:2,:]).astype(np.int32)
    
    print ("Beginning pass of A2...")
    # populate the ratio or difference image, 
    # this is just (data / mean) or (data - mean) (whether the data are original or from A1)
    diffImage_Local = np.empty_like(dataImage_Global_R)
    diffImage_Local[:] = _NDV
    with nogil, parallel(num_threads=20):
        #outerIdx = -1
        for y in prange(0, yShape):
            x = -1
            for x in range(0, xShape):
                if ((flagsImage_Global_R[y, x] & _OCEAN_FLAG) == _OCEAN_FLAG
                    # or meanImage_Global_R[y, x] == 0 # we need to be able to cope with mean = 0
                    or meanImage_Global_R[y, x] == _NDV
                    or dataImage_Global_R[y, x] == _NDV
                    ):
                    continue
                if FillByRatios == 0:    
                    diffImage_Local[y, x] = (dataImage_Global_R[y, x] - meanImage_Global_R[y, x])
                else:
                    diffImage_Local[y, x] =((dataImage_Global_R[y, x] - _AbsZeroPoint)
                        / (meanImage_Global_R[y, x] - _AbsZeroPoint))
                    if diffImage_Local[y, x] > _MaxAllowableRatio:
                        diffImage_Local[y, x] = _MaxAllowableRatio
                    elif diffImage_Local[y, x] < _MinAllowableRatio:
                        diffImage_Local[y, x] = _MinAllowableRatio
    
                
    # the distances image gets modified within a pass but we don't want / need
    # to store it outside of the pass
    # However we are now creating the fresh copy in the caller function for clarity
    #origDistImage_LocalCopy = np.copy(origDistImage_Global_R)
    origDistImage_LocalCopy = origDistImage_Global_R
    
    with nogil:
        for y in range (0, yShape): # can't do parallel here,  boooo
            for x in range (0, xShape):
                #flag = flagsImage[y,x]
                if (flagsImage_Global_R[y, x] & _FILL_FAILED_FLAG) != _FILL_FAILED_FLAG:
                    # it's already good data, or filled with A1
                    continue
                if meanImage_Global_R[y,x] == _NDV:
                    #we can't fill, but, if we are here then the flag is already set
                    #to failure (from A1)... could optionally set a separate A2 failure flag (128)
                    continue
                # else...
                nbrDiffSum = 0
                nbrDiffCount = 0
                nbrDistSum = 0
                # summarise the values in the surrounding 8 pixels 
                # we cannot precalculate this elementwise because the 
                # offsets / ratios in diffImageThisPass are updated in the loop,
                # affecting later iterations, hence why we can't parallelise simply
                for nbrIndex in range(1, _A2_MAX_NBRS+1):
                    # +1 because the first row of nbr table is the current cell
                    # this was an attempt at loop reversal so we could always keep it c-contiguous - 
                    # specify the order of the inner / outer loop as a parameter. Never got it working yet...
                    #if outerLoopShouldBe == 0: 
                    xNbr = x + nbrIntCoords[0, nbrIndex]
                    yNbr = y + nbrIntCoords[1, nbrIndex]
                    if (xNbr >= 0 and xNbr < xShape and
                            yNbr >= 0 and yNbr < yShape and
                            diffImage_Local[yNbr, xNbr] != _NDV):
                        nbrDiffSum += diffImage_Local[yNbr, xNbr]
                        nbrDiffCount += 1
                        if origDistImage_LocalCopy[yNbr, xNbr] != _NDV:
                            nbrDist = origDistImage_LocalCopy[yNbr, xNbr] 
                            # the distance of the filled cell will be the average of the distances
                            # of the neighbour cells used. Where "distance" of a neighbour cell 
                            # will be the physical distance from the working cell, plus the 
                            # distance already associated with the filled value in the nbr cell
                            # from the A1 algorithm, if applicable.
                            nbrDistSum += nbrDist
                        nbrDistSum += nbrTable[2, nbrIndex]
                        
                # if any of the surrounding 8 pixels had a value then derive the cell 
                # value from it
                if nbrDiffCount > 0 and (FillByRatios == 0 or nbrDiffSum > 0):
                    gotPixelVals += 1
                    # ratio / diff to use is the mean of the (up to) 8 values surrounding the cell
                    diffValThisPass = nbrDiffSum / nbrDiffCount
                    # fill in the same ratio image that we are checking in the neigbour search
                    # step such that the next cell along, in the current pass direction and if 
                    # it's a gap, will have this calculated value as an available input.
                    # Hence values are "smeared" in direction of the scan.
                    # This means loop iterations are not independent (not embarrasingly 
                    # parallel!) and thus this algorithm needs to be run on an entire 
                    # global image. 
                    diffImage_Local[y, x] = diffValThisPass
                    if FillByRatios == 0:
                        outputImage_ThisPass_W[y, x] = (diffValThisPass + meanImage_Global_R[y, x])
                    else:
                        outputImage_ThisPass_W[y, x] = (diffValThisPass * 
                                                        (meanImage_Global_R[y, x] - _AbsZeroPoint) 
                                                    + _AbsZeroPoint)
                else:
                    outputImage_ThisPass_W[y, x] = _NDV
                    # outside the function we can fill in flags image with _UNFILLABLE_AT_A2
                    # if all passes are nodata in finished image pass stack
    # nothing is returned, as the ratio and dist per-pass images are modified in-place
    print ("A2 filled {0!s} locations on this pass".format(gotPixelVals))


### Clip values

In [None]:
%%cython --compile-args=/openmp --link-args=/openmp --force
#%%cython --compile-args=-fopenmp --link-args=-fopenmp --force 
import numpy as np
cimport cython 
from libc.math cimport sqrt
#cdef int _SEARCH_RADIUS = 10
from cython.parallel import prange, parallel


@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
cpdef MinMaxClip(float[:,::1] dataImage, 
                 unsigned char[:,::1] flagsImage, 
                 float[:,::1] meansImage, 
                 float[:,::1] stdImage, 
                 unsigned char flagToCheck, 
                 unsigned char flagToSet,
                 float floor_ceiling_value,
                 float _NDV,
                 float upperHardLimit,
                 float lowerHardLimit
                 ):
    '''
    Clips (clamps) an image to not exceed +- n std from the mean image, or a hard upper / lower limit
    
    '''
    cdef:
        Py_ssize_t x, y, xShape, yShape
        float value, minAllowed, maxAllowed
        #char localFlag
        float negInf, posInf
        
    yShape = dataImage.shape[0]
    xShape = dataImage.shape[1]
    assert xShape == meansImage.shape[1]
    assert xShape == stdImage.shape[1]
    assert xShape == flagsImage.shape[1]
    assert yShape == meansImage.shape[0]
    assert yShape == stdImage.shape[0]
    assert yShape == flagsImage.shape[0]
    
    posInf = np.inf
    negInf = -np.inf
      
    #with nogil, parallel(num_threads=20):
    if 1:
        for y in range(yShape):
            value = -1
            maxAllowed = posInf
            minAllowed = negInf
            for x in range(xShape):
                if (flagsImage[y, x] & flagToCheck) != flagToCheck:
                    continue
                value = dataImage[y,x]
                if value == _NDV:
                    continue
                maxAllowed = meansImage[y, x] + (floor_ceiling_value * stdImage[y, x])
                minAllowed = meansImage[y, x] - (floor_ceiling_value * stdImage[y, x])
                if maxAllowed>=200.0:
                    print ("Whoops! Location {0!s},{1!s} had value {2!s}. Mean={3!s} and std={4!s} giving maxallowed of {5!s}"
                           .format(x,y,value,meansImage[y,x],stdImage[y,x],maxAllowed)
                    )
                    # crash
                    assert False
                if minAllowed<=-200.0:
                    print ("Whoops! Location {0!s},{1!s} had value {2!s}. Mean={3!s} and std={4!s} giving minallowed of {5!s}"
                           .format(x,y,value,meansImage[y,x],stdImage[y,x],minAllowed)
                    )
                    # crash
                    assert False
                if maxAllowed > upperHardLimit:
                    maxAllowed = upperHardLimit
                if minAllowed < lowerHardLimit:
                    minAllowed = lowerHardLimit
                
                if value > maxAllowed:
                    dataImage[y, x] = maxAllowed
                    flagsImage[y, x] = flagsImage[y, x] | flagToSet
                    continue
                if value < minAllowed:
                    dataImage[y, x] = minAllowed
                    flagsImage[y, x] = flagsImage[y, x] | flagToSet
                    continue

cpdef MinMaxClip3D(float[:,:,::1] dataImage, 
                 unsigned char[:,:,::1] flagsImage, 
                 float[:,::1] meansImage, 
                 float[:,::1] stdImage, 
                 unsigned char flagToCheck, 
                 unsigned char flagToSet,
                 float floor_ceiling_value,
                 float _NDV,
                 float upperHardLimit,
                 float lowerHardLimit
                 ):
    '''
    Clips / clamps a stack of images to not exceed +- n stds from a mean image or a hard upper/lower limit
    '''
    cdef:
        Py_ssize_t zSize, z
    zSize = dataImage.shape[0]
    assert zSize == flagsImage.shape[0]
    for z in range(zSize):
        MinMaxClip(dataImage[z], flagsImage[z], 
                   meansImage, stdImage, 
                   flagToCheck, flagToSet, floor_ceiling_value, _NDV, upperHardLimit, lowerHardLimit)